Logiciel

Le traceur GPS Polaris diffère de la plupart de ses concurrents, ce qui devrait contenter aussi bien l’utilisateur que le développeur. Polaris est une plateforme ouverte, ce qui est rarement le cas des traceurs GPS. Il offre bien sûr les mêmes commodités de base que d’autres, par exemple un micrologiciel préchargé assurant les fonctions essentielles de suivi des véhicules, mais la grande différence ici est que vous pouvez écrire vous-même ce micrologiciel, qui plus est sans être restreint au C/C++.

Polaris est en effet livré avec le logiciel médiateur (middleware) Zerynth à environnement Python. Sa licence d’utilisation est intégrée dans le microcontrôleur. Pour profiter des outils logiciels de Zerynth, il vous faudra télécharger l’EDI Zerynth Studio depuis le site de Zerynth. Notez que la licence Zerynth pour Python n’est accordée et enregistrée sur l’ordinateur hôte que si le premier micrologiciel chargé est écrit en Python. En clair, vous pourriez perdre la licence si au premier jour d'utilisation vous écrasiez le contenu de la mémoire du microcontrôleur avec un logiciel autre que Zerynth Studio, par exemple avec l’EDI Arduino. Soulignons également que l’utilisation de Zerynth Studio nécessite l’ouverture d’un compte Zerynth (gratuit).
 


L’éditeur de texte de Zerynth Studio n’étant pas le plus élaboré des éditeurs, l’utilisateur lui préférera sans doute son programe habituel pour l’écriture du code source, Visual Code p. ex. (qui est officiellement pris en charge). Quoi qu’il en soit, pensez avant toute chose à enregistrer la licence pour l’environnement Python. Vous pourrez dès lors lancer le premier programme de test. Le micrologiciel ne peut échanger des données qu’avec le portail Fortebit, qui lui aussi nécessite un compte. Son accès est gratuit durant les 30 premiers jours d’utilisation, puis soumis à un abonnement annuel. Polaris Cloud, le nom du portail en question, sert essentiellement à la surveillance simultanée d’une flotte de véhicules ou d’objets via un tableau de bord personnalisable. Il s’agit donc d’un service, et comme je ne m’intéresse ici qu’au matériel, je vous renvoie au site de Fortebit pour plus de détails.

L’application de démonstration n’échangera pas d’elle-même les données avec le système que vous comptez utiliser. Un certain travail préalable est ici nécessaire. Notez que ce sera le cas même si vous exploitez votre propre portail de suivi d’une flotte de véhicules. Même si cela n’entache en rien la bonne impression que laisse cette plateforme, voilà bien un point qui nécessiterait d’être amélioré. Encore une fois, rien ne vous oblige à travailler en Python, vous pouvez utiliser l’environnement Arduino. Le dépôt GitHub de Fortebit fournit les fichiers json qui permettent d’installer les paquets nécessaires depuis le Gestionnaire de cartes de l’EDI Arduino.

Le code d’exemple Python ci-dessous a été modifié pour permettre de tester rapidement le traceur. Il sert juste à montrer comment le matériel peut être connecté à un courtier MQTT générique, et est donc inadapté à toute utlisation pratique. Les changements nécessaires sont implantés avec quelques lignes de code Python puis rapidement transférés vers le matériel. Les données GPS sont ensuite envoyées à un courtier MQTT (de votre choix).

Pour résumer

Le matériel ouvert et la possibilité d’opter pour un environnement Python ou Arduino font du kit Polaris 3G kit+ de Fortebit une plateforme très intéressante pour le développement d’applications de suivi par GPS. La documentation gagnerait à être plus fournie, et quelques codes d’exemple supplémentaires ne seraient pas de trop, mais gageons que Fortebit n’attendra pas que sa plateforme gagne en popularité pour y remédier.

Le programme main.py ci-dessous (qui est donc une version modifiée du code original) vous permettra de démarrer le traceur Polaris 3G kit+ avec votre propre courtier MQTT.

 

# POLARIS_FORTEBIT
# Created at 2019-07-26 11:34:26.282569
# modified by someone(tm) 06-08-2020 for generic MQTT ( unsecured ) transport
# to use test.mosquitto.org as mqtt broker
# not for production use at all !
from mqtt import mqtt
from fortebit.polaris import polaris
from fortebit.polaris import modem
from fortebit.polaris import gnss
import vm
import sfw
vm.set_option(vm.VM_OPT_RESET_ON_EXCEPTION, 1)
vm.set_option(vm.VM_OPT_TRACE_ON_EXCEPTION, 1)
vm.set_option(vm.VM_OPT_RESET_ON_HARDFAULT, 1)
vm.set_option(vm.VM_OPT_TRACE_ON_HARDFAULT, 1)
import streams
s = streams.serial()

import mcu
import timestamp
import timers
import ssl
import requests
import threading
from wireless import gsm
import utils

sleep(1000)

# CONFIG
poll_time = 100                     # poll inputs at twice the specified period in ms
gps_period = 10000                  # gps lat,lon and speed telemetry period in ms
update_period = 6 * gps_period      # other telemetry data period in ms
no_ignition_period = 300000         # no ignition telemetry data period in ms

fw_version = "1.11"

# POLARIS INIT
try:
    print("Polaris default app")
    polaris.init()
    
    print("MCU UID:", [hex(b) for b in mcu.uid()])
    print("VM info:", vm.info())
    print("FW version:", fw_version)
    print("Watchdog was triggered:", sfw.watchdog_triggered())
    
    polaris.ledRedOn()
    polaris.ledGreenOff()

except Exception as e:
    print("Failed polaris init with", e)
    sleep(500)
    mcu.reset()


# INIT HW

try:
    print("Initializing Modem...")
    modem = modem.init()
    print("Initializing GNSS...")
    gnss = gnss.init()
    # verify preconditions and start utility thread
    utils.start()

    print("Starting Accelerometer...")
    import accel
    accel.start()
    print("Starting GNSS...")
    gnss.start()
    gnss.set_rate(2000)

    print("Starting Modem...")
    modem.startup()
    
    # enable modem/gnss utilities
    utils.modem = modem
    utils.gnss = gnss

    sfw.watchdog(0, 30000)
    sfw.kick()
    if utils.check_terminal(s):
        utils.do_terminal(s)

    minfo = gsm.mobile_info()
    print("Modem:", minfo)
    
    # enable SMS checking
    utils.check_sms = True
except Exception as e:
    print("Failed init hw with", e)
    sleep(500)
    mcu.reset()


# GATHERING SETTINGS
name = None
apn = None
email = None

try:
    print("Read settings...")
    settings = utils.readSettings()
    sfw.kick()

    if "apn" in settings:
        apn = settings["apn"]
        print("APN:", apn)

    if "email" in settings:
        email = settings["email"]
        print("Email:", email)

    if "name" in settings:
        name = settings["name"]
        print("Name:", name)

    if apn is not None and not utils.validate_apn(apn):
        print("Invalid APN!")
        apn = None
    if email is not None and not utils.validate_email(email):
        print("Invalid email!")
        email = None
    if name is None:
        name = "Polaris"
        print("Saving name:", name)
        settings["name"] = name
        utils.saveSettings(settings)
    if apn is None:
        print("APN is not defined, can't connect to Internet!")
        apn = utils.request_apn(s)
        print("Saving email:", email)
        print("Saving APN:", apn)
        settings["apn"] = apn
        utils.saveSettings(settings)
    if email is None:
        print("email is not defined, can't register to Cloud!")
        email = utils.request_email(s)
        settings["email"] = email
        utils.saveSettings(settings)
except Exception as e:
    print("Failed gathering settings with", e)
    sleep(500)
    mcu.reset()


# GSM ATTACH
try:
    print("Waiting for network...",end='')
    for _ in range(150):
        sfw.kick()
        ninfo = gsm.network_info()
        if ninfo[6]:
            break
        sleep(1000)
        if (_ % 10) == 9:
            print('.',end='')
    else:
        raise TimeoutError
    print("ok!")
    print("Attached to network:", ninfo)

    print("Activating data connection...")
    for _ in range(3):
        try:
            gsm.attach(apn)
            break
        except Exception as e:
            print("Retrying...", e)
        try:
            gsm.detach()
            sleep(5000 * (_ + 1))
        except:
            pass
    else:
        raise TimeoutError
    linfo = gsm.link_info()
    print("Connection parameters:", linfo)
except Exception as e:
    print("Network failure", e)
    sleep(500)
    mcu.reset()



# TELEMETRY LOOP
try:
    accel.get_sigma()  # reset accumulated value
    sleep(500)

    last_time = -(no_ignition_period + gps_period) # force sending data immediately
    counter = 0
    sos = -1
    connected = True
    ignition = None
    sos = None
    telemetry = {}
    disconn_time = None

    while True:
        # allow other threads to run while waiting
        sleep(poll_time*2)
        sfw.kick()
        now_time = timers.now()

        if utils.check_terminal(s):
            utils.do_terminal(s)

        # read inputs
        old_ign = ignition
        ignition = polaris.getIgnitionStatus()
        old_sos = sos
        sos = polaris.getEmergencyStatus()
        extra_send = False



        # led waiting status
        utils.status_led(False, ignition, connected)

        if sos != old_sos:
            telemetry['sos'] = sos
            extra_send = True

        if ignition != old_ign:
            telemetry['ignition'] = ignition
            extra_send = True

        if not extra_send:
            if ignition == 0:
                # sleep as indicated by rate for no ignition
                if now_time - last_time < no_ignition_period - poll_time:
                    continue
                extra_send = True
            else:
                # sleep as indicated by rate
                if now_time - last_time < gps_period - poll_time:
                    continue

        ts = modem.rtc()
        #print("MODEM RTC =", ts)

        if counter % (update_period / gps_period) == 0 or extra_send:
            telemetry['ignition'] = ignition
            telemetry['sos'] = sos

            if polaris.isBatteryBackup():
                telemetry['charger'] = -1
            else:
                telemetry['battery'] = utils.decimal(3, polaris.readMainVoltage())
                telemetry['charger'] = polaris.getChargerStatus()

            telemetry['backup'] = utils.decimal(3, polaris.readBattVoltage())
            telemetry['temperature'] = utils.decimal(2, accel.get_temperature())
            telemetry['sigma'] = utils.decimal(3, accel.get_sigma())

            pr = accel.get_pitchroll()
            telemetry['pitch'] = utils.decimal(1, pr[0])
            telemetry['roll'] = utils.decimal(1, pr[1])

        if gnss.has_fix():
            fix = gnss.fix()
            # only transmit position when it's accurate
            if fix[6] < 2.5:
                telemetry['latitude'] = utils.decimal(6, fix[0])
                telemetry['longitude'] = utils.decimal(6, fix[1])
                telemetry['speed'] = utils.decimal(1, fix[3])
                if counter % (update_period / gps_period) == 0 or extra_send:
                    telemetry['altitude'] = utils.decimal(1, fix[2])
                    telemetry['COG'] = utils.decimal(1, fix[4])
            if counter % (update_period / gps_period) == 0 or extra_send:
                telemetry['nsat'] = fix[5]
                telemetry['HDOP'] = utils.decimal(2, fix[6])
            # replace timestamp with GPS
            ts = fix[9]
        elif ts is not None and ts[0] < 2019:
            ts = None

        if ts is not None:
            ts = str(timestamp.to_unix(ts)) + "000"

        counter += 1
        last_time = now_time

        # led sending status
        utils.status_led(True, ignition, connected)
        telemetry['counter']=counter
        telemetry['dev_ts']=ts
        
        sfw.kick()
        # set the mqtt id to "zerynth-mqtt"
        client = mqtt.Client("zerynth-mqtt",True)
        # and try to connect to "test.mosquitto.org"
      
        for retry in range(10):
            try:
                client.connect("test.mosquitto.org", 60)
                break
            except Exception as e:
                print("connecting...")
        print("connected.")
        print("Publishing:", telemetry)
        
        client.publish("test", str(telemetry))
        client.loop()  
        ok = False;
        

except Exception as e:
    print("Failed telemetry loop", e)
    sleep(500)
    mcu.reset()

 


VF: Hervé Moreau