diff options
Diffstat (limited to 'src/pico/adafruit_bus_device/i2c_device.py')
| -rwxr-xr-x | src/pico/adafruit_bus_device/i2c_device.py | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/src/pico/adafruit_bus_device/i2c_device.py b/src/pico/adafruit_bus_device/i2c_device.py new file mode 100755 index 0000000..8f4fc9d --- /dev/null +++ b/src/pico/adafruit_bus_device/i2c_device.py @@ -0,0 +1,187 @@ +# SPDX-FileCopyrightText: 2016 Scott Shawcroft for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +""" +`adafruit_bus_device.i2c_device` - I2C Bus Device +==================================================== +""" + +import time + +try: + from typing import Optional, Type + from types import TracebackType + from circuitpython_typing import ReadableBuffer, WriteableBuffer + + # Used only for type annotations. + from busio import I2C +except ImportError: + pass + + +__version__ = "0.0.0+auto.0" +__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BusDevice.git" + + +class I2CDevice: + """ + Represents a single I2C device and manages locking the bus and the device + address. + + :param ~busio.I2C i2c: The I2C bus the device is on + :param int device_address: The 7 bit device address + :param bool probe: Probe for the device upon object creation, default is true + + .. note:: This class is **NOT** built into CircuitPython. See + :ref:`here for install instructions <bus_device_installation>`. + + Example: + + .. code-block:: python + + import busio + from board import * + from adafruit_bus_device.i2c_device import I2CDevice + + with busio.I2C(SCL, SDA) as i2c: + device = I2CDevice(i2c, 0x70) + bytes_read = bytearray(4) + with device: + device.readinto(bytes_read) + # A second transaction + with device: + device.write(bytes_read) + """ + + def __init__(self, i2c: I2C, device_address: int, probe: bool = True) -> None: + self.i2c = i2c + self.device_address = device_address + + if probe: + self.__probe_for_device() + + def readinto( + self, buf: WriteableBuffer, *, start: int = 0, end: Optional[int] = None + ) -> None: + """ + Read into ``buf`` from the device. The number of bytes read will be the + length of ``buf``. + + If ``start`` or ``end`` is provided, then the buffer will be sliced + as if ``buf[start:end]``. This will not cause an allocation like + ``buf[start:end]`` will so it saves memory. + + :param ~WriteableBuffer buffer: buffer to write into + :param int start: Index to start writing at + :param int end: Index to write up to but not include; if None, use ``len(buf)`` + """ + if end is None: + end = len(buf) + self.i2c.readfrom_into(self.device_address, buf, start=start, end=end) + + def write( + self, buf: ReadableBuffer, *, start: int = 0, end: Optional[int] = None + ) -> None: + """ + Write the bytes from ``buffer`` to the device, then transmit a stop + bit. + + If ``start`` or ``end`` is provided, then the buffer will be sliced + as if ``buffer[start:end]``. This will not cause an allocation like + ``buffer[start:end]`` will so it saves memory. + + :param ~ReadableBuffer buffer: buffer containing the bytes to write + :param int start: Index to start writing from + :param int end: Index to read up to but not include; if None, use ``len(buf)`` + """ + if end is None: + end = len(buf) + self.i2c.writeto(self.device_address, buf, start=start, end=end) + + # pylint: disable-msg=too-many-arguments + def write_then_readinto( + self, + out_buffer: ReadableBuffer, + in_buffer: WriteableBuffer, + *, + out_start: int = 0, + out_end: Optional[int] = None, + in_start: int = 0, + in_end: Optional[int] = None + ) -> None: + """ + Write the bytes from ``out_buffer`` to the device, then immediately + reads into ``in_buffer`` from the device. The number of bytes read + will be the length of ``in_buffer``. + + If ``out_start`` or ``out_end`` is provided, then the output buffer + will be sliced as if ``out_buffer[out_start:out_end]``. This will + not cause an allocation like ``buffer[out_start:out_end]`` will so + it saves memory. + + If ``in_start`` or ``in_end`` is provided, then the input buffer + will be sliced as if ``in_buffer[in_start:in_end]``. This will not + cause an allocation like ``in_buffer[in_start:in_end]`` will so + it saves memory. + + :param ~ReadableBuffer out_buffer: buffer containing the bytes to write + :param ~WriteableBuffer in_buffer: buffer containing the bytes to read into + :param int out_start: Index to start writing from + :param int out_end: Index to read up to but not include; if None, use ``len(out_buffer)`` + :param int in_start: Index to start writing at + :param int in_end: Index to write up to but not include; if None, use ``len(in_buffer)`` + """ + if out_end is None: + out_end = len(out_buffer) + if in_end is None: + in_end = len(in_buffer) + + self.i2c.writeto_then_readfrom( + self.device_address, + out_buffer, + in_buffer, + out_start=out_start, + out_end=out_end, + in_start=in_start, + in_end=in_end, + ) + + # pylint: enable-msg=too-many-arguments + + def __enter__(self) -> "I2CDevice": + while not self.i2c.try_lock(): + time.sleep(0) + return self + + def __exit__( + self, + exc_type: Optional[Type[type]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> bool: + self.i2c.unlock() + return False + + def __probe_for_device(self) -> None: + """ + Try to read a byte from an address, + if you get an OSError it means the device is not there + or that the device does not support these means of probing + """ + while not self.i2c.try_lock(): + time.sleep(0) + try: + self.i2c.writeto(self.device_address, b"") + except OSError: + # some OS's dont like writing an empty bytesting... + # Retry by reading a byte + try: + result = bytearray(1) + self.i2c.readfrom_into(self.device_address, result) + except OSError: + # pylint: disable=raise-missing-from + raise ValueError("No I2C device at address: 0x%x" % self.device_address) + # pylint: enable=raise-missing-from + finally: + self.i2c.unlock() |