My brother and I share a car, and we have to text each other so that we know if someone is using the car. Then I started thinking, "wouldn't it be nice if we had a system that did this for us?" And that's how this project took shape. This project also happened to be for an Embedded Systems class at John Brown University.
My design involved two Raspberry Pi's (one in my car and one in my house) that could communicate with each other using an nRF24L01 chip. The pi in the car had a GPS module attached so that it could send the coordinates of the car to the pi in the house when the car was parked. After the pi in the house received the coordinates, it would email a static map image to my phone using the Mapbox API.
I was able to get the car pi to retrieve the GPS data from the GPS module, parse data, and send the data to the other pi, however, I was unable to send the email from the pi because the campus network blocked that kind of traffic.
GPS Code
Pythonimport time
import serial
import sys
import argparse
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pyrf24 import RF24, RF24_PA_LOW
radio = RF24(22, 0)
ser = serial.Serial(port='/dev/serial0',baudrate=9600,timeout=1)
ser.reset_input_buffer()
valid = 'V' # Position status (A = data valid, V = data invalid)
payload = ""
# For this example, we will use different addresses
# An address need to be a buffer protocol object (bytearray)
address = [b"1Node", b"2Node"]
# It is very helpful to think of an address as a path instead of as
# an identifying device destination
# to use different addresses on a pair of radios, we need a variable to
# uniquely identify which address this radio will use to transmit
# 0 uses address[0] to transmit, 1 uses address[1] to transmit
radio_number = bool(
int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
)
# initialize the nRF24L01 on the spi bus
if not radio.begin():
raise OSError("nRF24L01 hardware isn't responding")
radio.dynamic_payloads = True
# radio.ack_payloads = True
# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity of each other
radio.set_pa_level(RF24_PA_LOW) # RF24_PA_MAX is default
# set TX address of RX node into the TX pipe
radio.open_tx_pipe(address[radio_number]) # always uses pipe 0
# set RX address of TX node into an RX pipe
radio.open_rx_pipe(1, address[not radio_number]) # using pipe 1
def master(count: int = 10): # count = 5 will only transmit 5 packets
"""Transmits an incrementing float every second"""
radio.listen = False # ensures the nRF24L01 is in TX mode
while count:
line = read_uart()
payload = encode_payload(line)
buffer = payload.encode('utf-8')
start_timer = time.monotonic_ns() # start timer
result = radio.write(buffer)
end_timer = time.monotonic_ns() # end timer
if not result:
print("Transmission failed or timed out")
time.sleep(1)
else:
print(
"Transmission successful! Time to Transmit:",
f"{(end_timer - start_timer) / 1000} us. Sent: {buffer}",
)
time.sleep(0.5)
count -= 1
def slave(timeout: int = 6):
"""Polls the radio and prints the received value. This method expires
after 6 seconds of no received transmission."""
radio.listen = True # put radio into RX mode and power up
start = time.monotonic()
while (time.monotonic() - start) < timeout:
has_payload, pipe_number = radio.available_pipe()
if has_payload:
length = radio.get_dynamic_payload_size() # grab the payload length
received = radio.read(length) # also clears radio.irq_dr status flag
payload = received.decode('utf-8')
print(f'payload: {payload}\n')
decoded = decode_payload(str(payload))
# print details about the received packet
print(f"Received {length} bytes on pipe {pipe_number}: {decoded[0]}, {decoded[1]}")
start = time.monotonic() # reset the timeout timer
# recommended behavior is to keep in TX mode while idle
radio.listen = False # put the nRF24L01 is in TX mode
def set_role():
"""Set the role using stdin stream. Timeout arg for slave() can be
specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
:return:
- True when role is complete & app should continue running.
- False when app should exit
"""
user_input = (
input(
"*** Enter 'R' for receiver role.\n"
"*** Enter 'T' for transmitter role.\n"
"*** Enter 'Q' to quit example.\n"
)
or "?"
)
user_input = user_input.split()
if user_input[0].upper().startswith("R"):
slave(*[int(x) for x in user_input[1:2]])
return True
if user_input[0].upper().startswith("T"):
master(*[int(x) for x in user_input[1:2]])
return True
if user_input[0].upper().startswith("Q"):
radio.power = False
return False
print(user_input[0], "is an unrecognized input. Please try again.")
return set_role()
def convert_lat_lon(line, Round=True):
"""input: list of strings
returns: list of strings"""
lat_str = line[3]
lat_dir = line[4]
lon_str = line[5]
lon_dir = line[6]
return_string = []
# Convert latitude and longitude to decimal degrees
lat_degrees = int(lat_str[:2]) + float(lat_str[2:]) / 60
lon_degrees = int(lon_str[:3]) + float(lon_str[3:]) / 60
# Add direction (North/South for latitude, East/West for longitude)
lat_degrees *= 1 if lat_dir == "N" else -1
lon_degrees *= 1 if lon_dir == "E" else -1
if (Round):
return_string.append(str(round(lat_degrees,4)))
return_string.append(str(round(lon_degrees,4)))
else:
return_string.append(str(lat_degrees))
return_string.append(str(lon_degrees))
return(return_string)
def encode_payload(strs):
""" input: list of strings, returns: a string
format: length(lat_e)#lat_elength(lon_e)#lon_e"""
return_string = ""
for string in strs:
return_string += str(len(string)) + "#" + string
return return_string
def decode_payload(str):
"""returns a list of strings
format: output_list[0] - lat, output_list[1] - lon"""
output_list, string_pointer = [], 0
while string_pointer < len(str):
length_pointer = string_pointer
while str[length_pointer] != "#":
length_pointer += 1
length = int(str[string_pointer:length_pointer])
output_list.append(str[length_pointer + 1 : length_pointer + 1 + length])
string_pointer = length_pointer + length + 1
return output_list
def read_uart():
""" reads gps data from pin
returns list of strings"""
# print("Establishing connection to satelites...")
while True:
if ser.in_waiting > 0:
try:
line = ser.readline().decode('utf-8').rstrip()
if line[:6] == "$GPRMC":
lin_2 = line.split(',')
print(lin_2)
valid = lin_2[2]
if valid == 'A':
print("valid")
return_str = convert_lat_lon(lin_2)
return(return_str)
else:
print("Searching...")
except UnicodeDecodeError as e:
print(f"Error decoding line: {e}")
line = "" # Handle the error by assigning an empty string or another suitable value
print(sys.argv[0]) # print example name
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"-r",
"--role",
type=int,
choices=range(2),
help="'1' specifies the TX role. '0' specifies the RX role.",
)
args = parser.parse_args() # parse any CLI args
try:
if args.role is None: # if not specified with CLI arg '-r'
while set_role():
pass # continue example until 'Q' is entered
elif bool(args.role): # if role was set using CLI args, run role once and exit
master()
else:
slave()
except KeyboardInterrupt:
print(" Keyboard Interrupt detected. Exiting...")
radio.power = False
sys.exit()
else:
print(" Run slave() on receiver\n Run master() on transmitter")
import smtplib
import requests
import base64
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
message = MIMEMultipart()
message["To"] = ''
message["From"] = 'Cherry'
message["Subject"] = 'For the demonstration'
token = "insert mapbox api token"
input_str = input("Enter the coordinates: ")
split_str = input_str.split(', ')
lon = split_str[1]
lat = split_str[0]
mapbox_url = f"https://api.mapbox.com/styles/v1/mapbox/satellite-streets-v12/static/pin-l+ff0000({lon},{lat})/{lon},{lat},16.29,0/500x500?access_token={token}"
# # Download the image
# response = requests.get(mapbox_url)
# print("stats:", response.status_code)
title = '<b> Title line here. </b>'
html_content = f'''
<html>
<body>
<img width="600" src="{mapbox_url}" alt="Mapbox map" />
</body>
</html>
'''
# Create the MIMEText message with HTML content
messageText = MIMEText(html_content, 'html')
message.attach(messageText)
email = '' # gmail to send from
password = '' # password for sender gmail
server = smtplib.SMTP('smtp.gmail.com:587')
status_code, response = server.ehlo('Gmail')
print(f"[*] Echoing the server: {status_code} {response}")
status_code, response = server.starttls()
print(f"[*] Starting TLS connection: {status_code} {response}")
status_code, response = server.login(email,password)
print(f"[*] Logging in: {status_code} {response}")
fromaddr = ''
toaddrs = ''
server.sendmail(fromaddr,toaddrs,message.as_string())
server.quit()
Comments