Skip to content

event-driven

i2cd : Python + Twisted + smbus

Discuter en python avec des périphériques i2c c’est déjà assez facile grace à smbus… Jusque là aucun problème, sauf quand j’ai commencé à faire des communications concurrentes. En théorie, le bus va échanger “n’importe quoi” pendant la concurrence, et donc les échanges en cours risquent d’être perturbés/invalides.

C’est effectivement le cas, mais évidement les primitives que j’avais développés détectaient l’erreur i/o renvoyée par smbus, et retentais un certains nombre de fois les mêmes opérations. Seulement, il se trouve que certains périphériques i2c (comme ceux reposant sur certains MSP430) gèrent mal le fait d’avoir vu un début de communication i2c, sans une fin de communication valide. Et du coup, certains “plantent”, n’acceptent plus de communications i2c et nécessitent un redémarrage.

J’ai alors décidé de mettre en place un modeste daemon i2cd qui aurait pour rôle de centraliser les échanges i2c, et d’apporter une interface socket TCP simple, via twisted pour permettre aux différents applicatifs de communiquer avec.

L’avantage premier de twisted est d’être un event-driven networking engine, c’est à dire qu’il va déclencher l’appel de certaines de vos méthodes en fonction d’un évènement réseau (connexion établie, données reçues, …) ; n’étant pas un professionnel de twisted, j’ai d’ailleurs découvert que par défaut, il ne fait pas d’appels concurrents à Protocol.dataReceived (évitant d’avoir à utiliser threading.Lock)

Je précise aussi que je le client va échanger avec i2cd un message constitué de plusieurs bytes, et que i2cd ne devra pas mélanger les messages lorsque il les enverra sur le bus i2c.

Le but de ce post est un PoC serveur + client, ne recherchez donc pas la compatibilité python-3, le respect de la PEP8, ou autre (mais n’hésitez pas à la proposer)…

Le client (directement avec les sockets python natives) :

#!/usr/bin/python2.7

__author__ = 'davixx'
I2CD_HOST = '127.0.0.1'
I2CD_PORT = 422

# -------------------------------------------------------------------------------------------------------------------- #

import socket
import sys
import re
import time

# -------------------------------------------------------------------------------------------------------------------- #

I2CD_WELCOME_BANNER_RE = re.compile('^200\sWELCOME\s\-\sYOU\sARE\s\d+\sIN$')
I2CD_ANSWER_RE = re.compile('^(?P<code>\d{3})\s(?P<str>.+)$')
I2CD_READED_STR_RE = re.compile('^READED\s(?P<byte>\d+)$')

# -------------------------------------------------------------------------------------------------------------------- #

def remote_msg(addr, msg, read=False):

    res = False

    try:
        # connect
        socket_client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        socket_client.connect((I2CD_HOST, I2CD_PORT))

        # banner
        socket_client.send('I2CD CLIENT\n')
        data = socket_client.recv(64).strip()
        if I2CD_WELCOME_BANNER_RE.match(data) is None:
            raise NameError('Invalid Welcome banner')

        # MSG
        data = ('READ' if read else 'SEND') + ' ' + hex(addr)
        for e in msg:
            data += ' ' + str(e)
        socket_client.send(data + '\n')
        data = socket_client.recv(64)
        data = I2CD_ANSWER_RE.match(data)
        if data is None:
            raise NameError('Invalid answer')
        if data.group('code') == '200':
            res = True
        elif data.group('code') == '201':
            data = I2CD_READED_STR_RE.match(data.group('str'))
            if data is None:
                raise NameError('Invalid answer')
            res = data.group('byte')
            res = int(res)

        # disconnect
        socket_client.send('QUIT\n')
        socket_client.close()

    except:
        pass

    return res

# -------------------------------------------------------------------------------------------------------------------- #

if __name__ == '__main__':

    # tcp client
    try:
        print remote_msg(addr=0x44, msg=[229, 1, 131, 1, 106])
        while True:
            r = remote_msg(addr=0x44, msg=[228, 17, 7, 152], read=True)
            print r
            time.sleep(1)
            if r == 0:
                pass
                # break
    except KeyboardInterrupt:
        sys.exit(0)

Le serveur (avec twisted) :

#!/usr/bin/python2.7 -u
__author__ = 'davixx'

# -------------------------------------------------------------------------------------------------------------------- #

from twisted.internet import protocol, reactor, endpoints
import time
import smbus
import sys

# -------------------------------------------------------------------------------------------------------------------- #

class I2CdProtocol(protocol.Protocol):

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def __init__(self, factory):
        self.factory = factory
        self.ready = False  # default status : the client is not ready to ask everything else than "Welcome"

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def connectionMade(self):
        self.factory.numProtocols += 1

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def connectionLost(self, reason):
        self.ready = False
        self.factory.numProtocols -= 1

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def ready_or_reject(self):

        # ready, this function return here
        if self.ready:
            return True

        # not ready... be verbose to the client, and lose connection
        self.transport.write('500 PLEASE WELCOME FIRST\n')
        self.transport.loseConnection()
        return False

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    @staticmethod
    def parse_addr_and_msg(data):
        try:
            # data split
            data = data.split(' ')

            # extract addr
            addr = data.pop(0).strip()
            assert addr.startswith('0x')
            addr = addr[2:]
            addr = int(addr, 16)
            assert 0 < addr < 128

            # extract message
            msg = list()
            for e in data:
                e = e.strip()
                e = int(e)
                assert 0 <= e <= 255
                msg.append(e)
            assert 0 < len(msg) < 64

        except:
            return None

        else:
            return I2cMessage(addr=addr, msg=msg)

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def dataReceived(self, data):

        # init
        data = data.strip()

        # must be ready, unless for welcome message
        if data != 'I2CD CLIENT':
            # should be ready
            if not self.ready_or_reject():
                return

        # Welcome message needed first
        if data == 'I2CD CLIENT':
            self.transport.write('200 WELCOME - YOU ARE ' + str(self.factory.numProtocols) + ' IN\n')
            self.ready = True

        # QUIT
        elif data == 'QUIT':
            self.ready = False
            self.transport.write('200 BYE BYE - REMAINING CLIENTS COUNT ' + str(self.factory.numProtocols - 1) + '\n')
            self.transport.loseConnection()

        # SEND 0x44 229 1 131 1 106
        elif data.startswith('SEND '):
            i2c_message = self.parse_addr_and_msg(data[5:])
            if i2c_message is None or not isinstance(i2c_message, I2cMessage):
                self.transport.write('500 INVALID COMMAND STRUCT\n')
            else:
                sended = i2c_message.send()
                self.transport.write(('200 SENDED' if sended else '400 FAILED') + '\n')

        # READ 0x44 228 17 7 152
        elif data.startswith('READ '):
            i2c_message = self.parse_addr_and_msg(data[5:])
            if i2c_message is None or not isinstance(i2c_message, I2cMessage):
                self.transport.write('500 INVALID COMMAND STRUCT\n')
            else:
                sended = i2c_message.send()
                if not sended:
                    self.transport.write('400 FAILED\n')
                res = i2c_message.read()
                self.transport.write('201 READED ' + str(res) + '\n')

        # unknown command...
        else:
            self.transport.write('500 UNKNOWN COMMAND\n')
            self.transport.loseConnection()

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

# -------------------------------------------------------------------------------------------------------------------- #

class I2CdFactory(protocol.Factory):

    def __init__(self):
        if hasattr(protocol.Factory, '__init__'):
            protocol.Factory.__init__(self)
        self.numProtocols = 0

    def buildProtocol(self, addr):
        return I2CdProtocol(factory=self)

# -------------------------------------------------------------------------------------------------------------------- #

class I2cMessage(object):

    def __init__(self, addr, msg):
        self.bus = smbus.SMBus(1)
        self.max_try = 3
        self.addr = addr
        self.msg = msg

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def log(self, msg, **kwargs):  # TODO
        print msg

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def _bus_write_byte(self, byte, max_try=3):

        # check type : int or hex
        if not isinstance(byte, int):
            raise NameError('byte must be an instance of int')

        # verbose
        line = ' > @' + str(hex(self.addr)) + ' : ' + str(byte).rjust(3, ' ') + ' (' + bin(byte).rjust(10, ' ') + ') '
        self.log(line, is_low_level=True)

        tried = 0

        while True:
            try:
                time.sleep(0.01)
                self.bus.write_byte(self.addr, byte)
                return
            except:
                if tried < max_try:
                    tried += 1
                    continue
                raise

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def _bus_read(self, max_try=3):

        # verbose
        line = ' < @' + str(hex(self.addr)) + 'READ BYTE'.ljust(16, ' ')

        tried = 0

        while True:
            try:
                time.sleep(0.01)
                r = self.bus.read_byte(self.addr)
                line += ' => ' + str(r)
                self.log(line, is_low_level=True)
                return r
            except:
                if tried < max_try:
                    tried += 1
                    continue
                line += ' => NULL'
                self.log(line, is_low_level=True)
                self.log('!!!! ' + 'self.bus.read_byte(addr)' + 'failed, tried:' + str(max_try), is_low_level=True)
                raise

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def send(self):

        tried = 0
        while True:
            try:
                for a_byte in self.msg:
                    self._bus_write_byte(a_byte)
                return True
            except Exception as err:
                if tried < self.max_try:
                    tried += 1
                    time.sleep(0.05)
                    continue
                self.log(' !! smbus failed to addr: ' + str(self.addr) + ' on dataset : ' + str(self.msg) + ' err: ' + str(err))
                return False

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def read(self):
        try:
            return self._bus_read()
        except:
            return None

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

    def __repr__(self):
        return str(hex(self.addr)) + ' => ' + str(self.msg)

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -#

# -------------------------------------------------------------------------------------------------------------------- #

if __name__ == '__main__':

    # init twisted tcp server
    try:
        endpoints.serverFromString(reactor, 'tcp:422').listen(I2CdFactory())
        reactor.run()
    except KeyboardInterrupt:
        sys.exit(0)

# -------------------------------------------------------------------------------------------------------------------- #