aboutsummaryrefslogtreecommitdiff
path: root/src/pico/adafruit_bus_device/i2c_device.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pico/adafruit_bus_device/i2c_device.py')
-rwxr-xr-xsrc/pico/adafruit_bus_device/i2c_device.py187
1 files changed, 187 insertions, 0 deletions
diff --git a/src/pico/adafruit_bus_device/i2c_device.py b/src/pico/adafruit_bus_device/i2c_device.py
new file mode 100755
index 0000000..8f4fc9d
--- /dev/null
+++ b/src/pico/adafruit_bus_device/i2c_device.py
@@ -0,0 +1,187 @@
+# SPDX-FileCopyrightText: 2016 Scott Shawcroft for Adafruit Industries
+#
+# SPDX-License-Identifier: MIT
+
+"""
+`adafruit_bus_device.i2c_device` - I2C Bus Device
+====================================================
+"""
+
+import time
+
+try:
+ from typing import Optional, Type
+ from types import TracebackType
+ from circuitpython_typing import ReadableBuffer, WriteableBuffer
+
+ # Used only for type annotations.
+ from busio import I2C
+except ImportError:
+ pass
+
+
+__version__ = "0.0.0+auto.0"
+__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BusDevice.git"
+
+
+class I2CDevice:
+ """
+ Represents a single I2C device and manages locking the bus and the device
+ address.
+
+ :param ~busio.I2C i2c: The I2C bus the device is on
+ :param int device_address: The 7 bit device address
+ :param bool probe: Probe for the device upon object creation, default is true
+
+ .. note:: This class is **NOT** built into CircuitPython. See
+ :ref:`here for install instructions <bus_device_installation>`.
+
+ Example:
+
+ .. code-block:: python
+
+ import busio
+ from board import *
+ from adafruit_bus_device.i2c_device import I2CDevice
+
+ with busio.I2C(SCL, SDA) as i2c:
+ device = I2CDevice(i2c, 0x70)
+ bytes_read = bytearray(4)
+ with device:
+ device.readinto(bytes_read)
+ # A second transaction
+ with device:
+ device.write(bytes_read)
+ """
+
+ def __init__(self, i2c: I2C, device_address: int, probe: bool = True) -> None:
+ self.i2c = i2c
+ self.device_address = device_address
+
+ if probe:
+ self.__probe_for_device()
+
+ def readinto(
+ self, buf: WriteableBuffer, *, start: int = 0, end: Optional[int] = None
+ ) -> None:
+ """
+ Read into ``buf`` from the device. The number of bytes read will be the
+ length of ``buf``.
+
+ If ``start`` or ``end`` is provided, then the buffer will be sliced
+ as if ``buf[start:end]``. This will not cause an allocation like
+ ``buf[start:end]`` will so it saves memory.
+
+ :param ~WriteableBuffer buffer: buffer to write into
+ :param int start: Index to start writing at
+ :param int end: Index to write up to but not include; if None, use ``len(buf)``
+ """
+ if end is None:
+ end = len(buf)
+ self.i2c.readfrom_into(self.device_address, buf, start=start, end=end)
+
+ def write(
+ self, buf: ReadableBuffer, *, start: int = 0, end: Optional[int] = None
+ ) -> None:
+ """
+ Write the bytes from ``buffer`` to the device, then transmit a stop
+ bit.
+
+ If ``start`` or ``end`` is provided, then the buffer will be sliced
+ as if ``buffer[start:end]``. This will not cause an allocation like
+ ``buffer[start:end]`` will so it saves memory.
+
+ :param ~ReadableBuffer buffer: buffer containing the bytes to write
+ :param int start: Index to start writing from
+ :param int end: Index to read up to but not include; if None, use ``len(buf)``
+ """
+ if end is None:
+ end = len(buf)
+ self.i2c.writeto(self.device_address, buf, start=start, end=end)
+
+ # pylint: disable-msg=too-many-arguments
+ def write_then_readinto(
+ self,
+ out_buffer: ReadableBuffer,
+ in_buffer: WriteableBuffer,
+ *,
+ out_start: int = 0,
+ out_end: Optional[int] = None,
+ in_start: int = 0,
+ in_end: Optional[int] = None
+ ) -> None:
+ """
+ Write the bytes from ``out_buffer`` to the device, then immediately
+ reads into ``in_buffer`` from the device. The number of bytes read
+ will be the length of ``in_buffer``.
+
+ If ``out_start`` or ``out_end`` is provided, then the output buffer
+ will be sliced as if ``out_buffer[out_start:out_end]``. This will
+ not cause an allocation like ``buffer[out_start:out_end]`` will so
+ it saves memory.
+
+ If ``in_start`` or ``in_end`` is provided, then the input buffer
+ will be sliced as if ``in_buffer[in_start:in_end]``. This will not
+ cause an allocation like ``in_buffer[in_start:in_end]`` will so
+ it saves memory.
+
+ :param ~ReadableBuffer out_buffer: buffer containing the bytes to write
+ :param ~WriteableBuffer in_buffer: buffer containing the bytes to read into
+ :param int out_start: Index to start writing from
+ :param int out_end: Index to read up to but not include; if None, use ``len(out_buffer)``
+ :param int in_start: Index to start writing at
+ :param int in_end: Index to write up to but not include; if None, use ``len(in_buffer)``
+ """
+ if out_end is None:
+ out_end = len(out_buffer)
+ if in_end is None:
+ in_end = len(in_buffer)
+
+ self.i2c.writeto_then_readfrom(
+ self.device_address,
+ out_buffer,
+ in_buffer,
+ out_start=out_start,
+ out_end=out_end,
+ in_start=in_start,
+ in_end=in_end,
+ )
+
+ # pylint: enable-msg=too-many-arguments
+
+ def __enter__(self) -> "I2CDevice":
+ while not self.i2c.try_lock():
+ time.sleep(0)
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[type]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> bool:
+ self.i2c.unlock()
+ return False
+
+ def __probe_for_device(self) -> None:
+ """
+ Try to read a byte from an address,
+ if you get an OSError it means the device is not there
+ or that the device does not support these means of probing
+ """
+ while not self.i2c.try_lock():
+ time.sleep(0)
+ try:
+ self.i2c.writeto(self.device_address, b"")
+ except OSError:
+ # some OS's dont like writing an empty bytesting...
+ # Retry by reading a byte
+ try:
+ result = bytearray(1)
+ self.i2c.readfrom_into(self.device_address, result)
+ except OSError:
+ # pylint: disable=raise-missing-from
+ raise ValueError("No I2C device at address: 0x%x" % self.device_address)
+ # pylint: enable=raise-missing-from
+ finally:
+ self.i2c.unlock()