rkeysiv
Published © GPL3+

Automated Car Tracking System

My brother and I share a car. This system helps us keep track of who has the car and where it is located on campus

IntermediateProtip410
Automated Car Tracking System

Things used in this project

Story

Read more

Code

GPS Code

Python
This script is based on the getting_started.py example code from the pyRF24 library: https://github.com/nRF24/pyRF24/. It ran on both the house pi and the car pi
import time
import serial
import sys
import argparse
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pyrf24 import RF24, RF24_PA_LOW


radio = RF24(22, 0)

ser = serial.Serial(port='/dev/serial0',baudrate=9600,timeout=1)
ser.reset_input_buffer()
valid = 'V'   # Position status (A = data valid, V = data invalid)

payload = ""

# For this example, we will use different addresses
# An address need to be a buffer protocol object (bytearray)
address = [b"1Node", b"2Node"]
# It is very helpful to think of an address as a path instead of as
# an identifying device destination

# to use different addresses on a pair of radios, we need a variable to
# uniquely identify which address this radio will use to transmit
# 0 uses address[0] to transmit, 1 uses address[1] to transmit
radio_number = bool(
    int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
)

# initialize the nRF24L01 on the spi bus
if not radio.begin():
    raise OSError("nRF24L01 hardware isn't responding")

radio.dynamic_payloads = True

# radio.ack_payloads = True

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity of each other
radio.set_pa_level(RF24_PA_LOW)  # RF24_PA_MAX is default

# set TX address of RX node into the TX pipe
radio.open_tx_pipe(address[radio_number])  # always uses pipe 0

# set RX address of TX node into an RX pipe
radio.open_rx_pipe(1, address[not radio_number])  # using pipe 1


def master(count: int = 10):  # count = 5 will only transmit 5 packets
    """Transmits an incrementing float every second"""
    radio.listen = False  # ensures the nRF24L01 is in TX mode

    while count:
        line = read_uart()
        payload = encode_payload(line)
        buffer = payload.encode('utf-8')
        
        start_timer = time.monotonic_ns()  # start timer    
        result = radio.write(buffer)
        end_timer = time.monotonic_ns()  # end timer
        if not result:
            print("Transmission failed or timed out")
            time.sleep(1)
        else:
            print(
                "Transmission successful! Time to Transmit:",
                f"{(end_timer - start_timer) / 1000} us. Sent: {buffer}",
            )
        time.sleep(0.5)
        count -= 1


def slave(timeout: int = 6):
    """Polls the radio and prints the received value. This method expires
    after 6 seconds of no received transmission."""
    radio.listen = True  # put radio into RX mode and power up

    start = time.monotonic()
    while (time.monotonic() - start) < timeout:
        has_payload, pipe_number = radio.available_pipe()
        if has_payload:
            length = radio.get_dynamic_payload_size()  # grab the payload length
            received = radio.read(length)  # also clears radio.irq_dr status flag
            payload = received.decode('utf-8')
            print(f'payload: {payload}\n')
            decoded = decode_payload(str(payload))
            # print details about the received packet
            print(f"Received {length} bytes on pipe {pipe_number}: {decoded[0]}, {decoded[1]}")
            start = time.monotonic()  # reset the timeout timer

    # recommended behavior is to keep in TX mode while idle
    radio.listen = False  # put the nRF24L01 is in TX mode


def set_role():
    """Set the role using stdin stream. Timeout arg for slave() can be
    specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)

    :return:
        - True when role is complete & app should continue running.
        - False when app should exit
    """
    user_input = (
        input(
            "*** Enter 'R' for receiver role.\n"
            "*** Enter 'T' for transmitter role.\n"
            "*** Enter 'Q' to quit example.\n"
        )
        or "?"
    )
    user_input = user_input.split()
    if user_input[0].upper().startswith("R"):
        slave(*[int(x) for x in user_input[1:2]])
        return True
    if user_input[0].upper().startswith("T"):
        master(*[int(x) for x in user_input[1:2]])
        return True
    if user_input[0].upper().startswith("Q"):
        radio.power = False
        return False
    print(user_input[0], "is an unrecognized input. Please try again.")
    return set_role()

def convert_lat_lon(line, Round=True):
    """input: list of strings
       returns: list of strings"""

    lat_str = line[3]
    lat_dir = line[4]
    lon_str = line[5]
    lon_dir = line[6]
    return_string = []

    # Convert latitude and longitude to decimal degrees
    lat_degrees = int(lat_str[:2]) + float(lat_str[2:]) / 60
    lon_degrees = int(lon_str[:3]) + float(lon_str[3:]) / 60

    # Add direction (North/South for latitude, East/West for longitude)
    lat_degrees *= 1 if lat_dir == "N" else -1
    lon_degrees *= 1 if lon_dir == "E" else -1

    if (Round):
        return_string.append(str(round(lat_degrees,4)))
        return_string.append(str(round(lon_degrees,4)))
    else:
        return_string.append(str(lat_degrees))
        return_string.append(str(lon_degrees))

    return(return_string)

def encode_payload(strs):
    """ input: list of strings, returns: a string
    format: length(lat_e)#lat_elength(lon_e)#lon_e"""

    return_string = ""
    for string in strs:
        return_string += str(len(string)) + "#" + string
    return return_string

def decode_payload(str):
    """returns a list of strings
    format: output_list[0] - lat, output_list[1] - lon"""

    output_list, string_pointer = [], 0
    while string_pointer < len(str):
        length_pointer = string_pointer
        while str[length_pointer] != "#":
            length_pointer += 1
        length = int(str[string_pointer:length_pointer])
        output_list.append(str[length_pointer + 1 : length_pointer + 1 + length])
        string_pointer = length_pointer + length + 1
    return output_list

def read_uart():
    """ reads gps data from pin
    returns list of strings"""

    # print("Establishing connection to satelites...")

    while True:
        if ser.in_waiting > 0:
            try:
                line = ser.readline().decode('utf-8').rstrip()

                if line[:6] == "$GPRMC":
                    lin_2 = line.split(',')
                    print(lin_2)
                    valid = lin_2[2]
                    if valid == 'A':
                        print("valid")

                        return_str = convert_lat_lon(lin_2)

                        return(return_str)
                    else:
                        print("Searching...")

            except UnicodeDecodeError as e:
                print(f"Error decoding line: {e}")
                line = ""  # Handle the error by assigning an empty string or another suitable value


print(sys.argv[0])  # print example name


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "-r",
        "--role",
        type=int,
        choices=range(2),
        help="'1' specifies the TX role. '0' specifies the RX role.",
    )
    args = parser.parse_args()  # parse any CLI args

    try:
        if args.role is None:  # if not specified with CLI arg '-r'
            while set_role():
                pass  # continue example until 'Q' is entered
        elif bool(args.role):  # if role was set using CLI args, run role once and exit
            master()
        else:
            slave()
    except KeyboardInterrupt:
        print(" Keyboard Interrupt detected. Exiting...")
        radio.power = False
        sys.exit()
else:
    print("    Run slave() on receiver\n    Run master() on transmitter")

Email Code

Python
This code connects to the mapbox api and sends an email
import smtplib
import requests
import base64
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

message = MIMEMultipart()
message["To"] = ''
message["From"] = 'Cherry'
message["Subject"] = 'For the demonstration'

token = "insert mapbox api token"

input_str = input("Enter the coordinates: ")

split_str = input_str.split(', ')
lon = split_str[1]
lat = split_str[0]

mapbox_url = f"https://api.mapbox.com/styles/v1/mapbox/satellite-streets-v12/static/pin-l+ff0000({lon},{lat})/{lon},{lat},16.29,0/500x500?access_token={token}"

# # Download the image
# response = requests.get(mapbox_url)
# print("stats:", response.status_code)

title = '<b> Title line here. </b>'
html_content = f'''
<html>
    <body>
        <img width="600" src="{mapbox_url}" alt="Mapbox map" />
    </body>
</html>
'''

# Create the MIMEText message with HTML content
messageText = MIMEText(html_content, 'html')
message.attach(messageText)

email = '' # gmail to send from
password = '' # password for sender gmail

server = smtplib.SMTP('smtp.gmail.com:587')

status_code, response = server.ehlo('Gmail')
print(f"[*] Echoing the server: {status_code} {response}")

status_code, response = server.starttls()
print(f"[*] Starting TLS connection: {status_code} {response}")

status_code, response = server.login(email,password)
print(f"[*] Logging in: {status_code} {response}")

fromaddr = ''
toaddrs  = ''
server.sendmail(fromaddr,toaddrs,message.as_string())

server.quit()

Credits

rkeysiv

rkeysiv

1 project • 0 followers

Comments