Hardware components | ||||||
| × | 1 | ||||
Software apps and online services | ||||||
| ||||||
|
This project is an ambitious one, where we want to use some of the most questionable parts of the internet, comment sections and chatrooms, to create art.
We also want to make the project easily accessible so anyone can try their hand at generating some fine AI art. If you can't wait to try it out yourself, here's a link to the project.
Project VideoRemo.tvThe first step is gathering chat messages and comments. With our idea to make it easily accessible, Remo.tv is a natural pick. It's a robot streaming platform which allows you to connect all kinds of hardware to the internet, and let anyone control them. It also has a chat functionality and the ability to display images, which is exactly what we're looking for!
In this, case the hardware we'll be using is a Raspberry Pi.
Remo.tv has a great Github page with setup instructions.
Once setup, our Raspberry Pi can start receiving the chat messages, sent via Remo.tv.
DeepAIWith Remo.tv setup we can move on the artsy part. Each comment that we receive needs to be transformed into art, and to achieve this we'll be using some artificial intelligence magic.
Luckily there as another platform to make our life easy, DeepAI. They have all kinds of AI related features, but the ones we are interested in are their APIs.
The first API we use is Text To Image, all we need to do is send a text and wait for the magic to happen. In the above picture you can see the result of sending A dog with a funny hat.
Our generated picture isn't art just yet, so we use their Fast Style Transfer. This API expects an original image, in our case our generated one, and a style to apply. You can see the result of combining our dog with a funny hat and a classic van Gogh painting.
Dataflow & CodeWith al the separated pieces completed we can connect them. In the drawing we give an overview of the data flow:
- A chat message arrives from Remo.tv to our Raspberry Pi
- Our Pi sends this message to the Text To Image API and receives a generated image back
- This image, together with a randomly selected art style, is then sent to the Fast Style Transfer API
- After receiving the combination of the art style and the generated image, the Raspberry Pi streams the result to Remo.tv.
To be able to stream the generated image to Remo.tv we needed to write some custom code. Luckily, the lovely Remo.tv community helped us with that, thanks guys! :)
For all the curious, the full code is included in this project so you can get started right away.
ResultWith all that hard work done, it's time to enjoy some fine art!
- An old banana
- Chicken nugget
- Cute cats eating watermelon
- Floating on a cloud
- Loneliness
- My happy place
- Nowhere
Here is the link to Comment To Art on Remo.tv if you want to try it yourself!
import robot_util
import requests
import urllib.request
import requests
import random
from video.ffmpeg_process import *
import audio_util
import networking
import watchdog
import subprocess
import shlex
import schedule
import extended_command
import atexit
import os
import sys
import logging
import time
import signal
import re
styles = []
# Add your art styles urls here
styles.append('https://cdn.britannica.com/78/43678-050-F4DC8D93/Starry-Night-canvas-Vincent-van-Gogh-New-1889.jpg')
def textToImage(text):
r = requests.post(
"https://api.deepai.org/api/text2img",
data={
'text': text,
},
headers={'api-key': 'YOUR API KEY HERE'}
)
result = r.json()
return result['output_url']
def styleTransfer(targetURL):
index = random.randint(0, len(styles)-1)
styleUrl =styles[index]
r = requests.post(
"https://api.deepai.org/api/fast-style-transfer",
data={
'content': targetURL,
'style': styleUrl,
},
headers={'api-key': 'YOUR API KEY HERE'}
)
result = r.json()
return result['output_url']
def startFFMPEG(command, name, atExit, process):
try:
if sys.platform.startswith('linux') or sys.platform == "darwin":
ffmpeg_process=subprocess.Popen(command, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid)
globals()[process] = ffmpeg_process
except OSError: # Can't find / execute ffmpeg
robot_util.terminate_controller()
return()
if ffmpeg_process != None:
try:
atexit.unregister(atExit) # Only python 3
except AttributeError:
pass
def generateImage(text, author):
robot_util.sendChatMessage("Generating...")
cleanText= re.sub('[^A-Za-z0-9]+', '', text)
cleanUser = re.sub('[^A-Za-z0-9]+', '', author)
targetURL = textToImage(text)
resultURL = styleTransfer(targetURL)
imgPath = "/home/pi/remotv/img/"+ str(int(time.time()))+ '-' + cleanText + '-' + cleanUser + ".png"
urllib.request.urlretrieve(resultURL, imgPath)
os.system("sudo killall ffmpeg")
videoCommandLine = 'ffmpeg -loop 1 -i {PathToFile} -s 720x480 -f mpegts -codec:v mpeg1video -an -b:v 2500k -muxdelay 0.001 http://remo.tv:1567/transmit?name=<YOUR ROBOT UID HERE>-video'
videoCommandLine = videoCommandLine.format(PathToFile=imgPath)
startFFMPEG(videoCommandLine, 'Video', atExitVideoCapture, 'video_process')
robot_util.sendChatMessage("Now showing: " + text + " by " + author)
def atExitVideoCapture():
print('it dead!')
def setup(robot_config):
return
def move(args, argsCustom):
try:
if args['button']['label'] == 'Generate':
message = argsCustom['message']
user = argsCustom['sender']
generateImage(message,user)
if args['button']['label'] == 'More Info':
robot_util.sendChatMessage("Type in the chat and press 'Generate' to create an piece of AI art.")
time.sleep(5)
except:
pass
return
from __future__ import print_function
# TODO move all defs to the top of the file, out of the way of the flow of execution.
# TODO full python3 support will involve installing the adafruit drivers, not using the ones from the repo
import traceback
import argparse
import robot_util
import os.path
import networking
import time
import schedule
import sys
import watchdog
import logging
import logging.handlers
import json
import atexit
if (sys.version_info > (3, 0)):
import importlib
import _thread as thread
else:
import thread
# borrowed unregister function from
# https://stackoverflow.com/questions/32097982/cannot-unregister-functions-from-atexit-in-python-2-7
def unregister(func, *targs, **kargs):
"""unregister a function previously registered with atexit.
use exactly the same aguments used for before register.
"""
for i in range(0,len(atexit._exithandlers)):
if (func, targs, kargs) == atexit._exithandlers[i] :
del atexit._exithandlers[i]
return True
return False
atexit.unregister = unregister
from threading import Timer
# fail gracefully if configparser is not installed
try:
from configparser import ConfigParser
robot_config = ConfigParser()
except ImportError:
print("Missing configparser module (python -m pip install configparser)")
sys.exit()
try:
with open('controller.conf', 'r') as fp:
robot_config.readfp(fp)
except IOError:
print("unable to read controller.conf, please check that you have copied controller.sample.conf to controller.conf and modified it appropriately.")
sys.exit()
except:
print ("Error in controller.conf:", sys.exc_info()[0])
sys.exit()
def write(self, config_file):
sections = 0
keys = 0
errors = 0
keys_in = 0
sections_in = 0
# count sections and keys in config
for section in self.sections():
sections_in += 1
for option in self[section]:
keys_in += 1
# read the existing config file
with open(config_file, 'r') as fp:
lines = fp.readlines()
# parse the config file line by line and update any keys found
for count, line in enumerate(lines):
temp_line = line.lstrip(' \t')
if temp_line[0] == '#' or temp_line[0] == ';':
continue # comment
elif temp_line[0] == '[':
section = temp_line.find(']')
if sections == -1:
errors += 1
log.error("ERROR: Unable parse line {} of config as [section] header : '{}'".format(count, line.rstrip('\r\n')))
else:
sections+=1
section = temp_line[1:section]
elif temp_line[0] == '\r' or temp_line[0] == '\n':
continue # blank line
else:
key = temp_line.find('=')
if key == -1:
errors += 1
log.error("ERROR: Unable parse line {} of config as key=value pair : '{}'".format(count, line.rstrip('\r\n')))
else:
keys += 1
key = temp_line[0:key]
key = key.strip()
value = robot_config.get(section, key)
lines[count] = '{}={}\n'.format(key, value)
if sections != sections_in:
log.error("ERROR: {} sections in file, {} in object".format(sections, sections_in))
if keys != keys_in:
log.error("ERROR: {} keys in file, {} in object".format(keys, keys_in))
# delete the existing config backup
if os.path.exists(config_file + '.bak'):
os.remove(config_file + '.bak')
os.rename(config_file, config_file+'.bak')
# write out the updated config file
f = open(config_file, 'w')
f.writelines(lines)
f.close
log.info("Config file saved.")
ConfigParser.write = write
handlingCommand = False
chat_module = None
move_handler = None
# pass the lock to robot_util so the controller can be terminated from outside.
terminate = thread.allocate_lock()
robot_util.terminate = terminate
# Enable logging, based upon the settings in the conf file.
log = logging.getLogger('RemoTV')
log.setLevel(logging.DEBUG)
console_handler=logging.StreamHandler()
console_handler.setLevel(logging.getLevelName(robot_config.get('logging', 'console_level')))
console_formatter=logging.Formatter('%(asctime)s - %(filename)s : %(message)s','%H:%M:%S')
console_handler.setFormatter(console_formatter)
try:
file_handler=logging.handlers.RotatingFileHandler(robot_config.get('logging', 'log_file'),
maxBytes=robot_config.getint('logging', 'max_size'),
backupCount=robot_config.getint('logging', 'num_backup'))
except IOError:
print("Error: Unable to write to log files. Check that they not owned by root, and that controller has write permissions to them")
sys.exit()
file_handler.setLevel(logging.getLevelName(robot_config.get('logging', 'file_level')))
file_formatter=logging.Formatter('%(asctime)s %(name)s %(levelname)s - %(message)s','%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(file_formatter)
log.addHandler(console_handler)
log.addHandler(file_handler)
log.critical('RemoTV Controller Starting up')
# Log all unhandled exceptions.
def exceptionLogger(exctype, value, tb):
log.critical("Unhandled exception of type : {}".format(exctype),exc_info=(exctype, value, tb))
sys.excepthook = exceptionLogger
# This is required to allow us to get True / False boolean values from the
# command line
def str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
# TODO assess these and other options in the config to see which ones are most
# appropriate to be overidden from the command line.
# check the command line for and config file overrides.
parser = argparse.ArgumentParser(description='start robot control program')
parser.add_argument('--robot-key', help='Robot API Key', default=robot_config.get('robot', 'robot_key'))
parser.add_argument('--type', help="Serial or motor_hat or gopigo2 or gopigo3 or l298n or motozero or pololu", default=robot_config.get('robot', 'type'))
parser.add_argument('--video', default=robot_config.get('camera', 'type'))
parser.add_argument('--custom-hardware', type=str2bool, default=robot_config.getboolean('misc', 'custom_hardware'))
parser.add_argument('--custom-tts', type=str2bool, default=robot_config.getboolean('misc', 'custom_tts'))
parser.add_argument('--custom-chat', type=str2bool, default=robot_config.getboolean('misc', 'custom_chat'))
parser.add_argument('--custom-video', type=str2bool, default=robot_config.getboolean('misc', 'custom_video'))
parser.add_argument('--ext-chat-command', type=str2bool, default=robot_config.getboolean('tts', 'ext_chat'))
parser.add_argument('--no-mic', dest='no_mic', action='store_true')
parser.set_defaults(no_mic=False)
parser.add_argument('--no-camera', dest='no_camera', action='store_true')
parser.set_defaults(no_camera=False)
parser.add_argument('--test', dest='test_mode', action='store_true')
parser.set_defaults(test_mode=False)
commandArgs = parser.parse_args()
log.debug('command line arguments : %s', commandArgs)
# push command line variables back into the config
robot_config.set('robot', 'robot_key', str(commandArgs.robot_key))
robot_config.set('robot', 'type', commandArgs.type)
robot_config.set('misc', 'custom_hardware', str(commandArgs.custom_hardware))
robot_config.set('misc', 'custom_tts', str(commandArgs.custom_tts))
robot_config.set('misc', 'custom_chat', str(commandArgs.custom_chat))
robot_config.set('tts', 'ext_chat', str(commandArgs.ext_chat_command))
if commandArgs.no_mic:
robot_config.set('camera', 'no_mic', 'True')
if commandArgs.no_camera:
robot_config.set('camera', 'no_camera', 'True')
# set variables pulled from the config
robotKey = commandArgs.robot_key
ext_chat = commandArgs.ext_chat_command
enable_async = robot_config.getboolean('misc', 'enable_async')
# check test_mode
test_mode = commandArgs.test_mode
if test_mode:
log.critical("Remo.TV Controller starting in test mode")
if ext_chat:
import extended_command
# Functions
def handle_message(ws, message):
log.debug(message)
try:
messageData = json.loads(message)
except:
log.error("Unable to parse message")
return
# try:
if "e" not in messageData:
log.error("Malformed Message")
event = messageData["e"]
data = messageData["d"]
if event == "BUTTON_COMMAND":
on_handle_command(data)
# handle_command(data)
elif event == "MESSAGE_RECEIVED":
if data['channel_id'] == networking.channel_id:
if data['type'] != "robot":
on_handle_chat_message(data)
elif event == "ROBOT_VALIDATED":
networking.handleConnectChatChannel(data["host"])
# except Exception as e:
# print(e)
def handle_chat_message(args):
log.info("chat message received: %s", args)
if ext_chat:
extended_command.handler(args)
message = args["message"]
try:
if not message[0] == ".":
tts.say(args)
except IndexError:
exit()
def handle_command(args):
global handlingCommand
handlingCommand = True
# catch move commands that happen before the controller has fully
# loaded and set a move handler.
if move_handler == None:
return
log.debug('got command : %s', args)
move_handler(args,argsCustom)
handlingCommand = False
def on_handle_command(*args):
log.debug("on_handle_command : {} {}".format(handlingCommand, enable_async))
if handlingCommand and not enable_async:
return
else:
thread.start_new_thread(handle_command, args)
global argsCustom
def on_handle_chat_message(*args):
global argsCustom
argsCustom = args[0]
if chat_module == None:
thread.start_new_thread(handle_chat_message, args)
else:
thread.start_new_thread(chat_module.handle_chat,args)
def restart_controller(command, args):
if extended_command.is_authed(args['sender']) == 2: # Owner
terminate.acquire()
# TODO : This really doesn't belong here, should probably be in start script.
# watch dog timer
if robot_config.getboolean('misc', 'watchdog'):
import os
os.system("sudo modprobe bcm2835_wdt")
os.system("sudo /usr/sbin/service watchdog start")
# Load and start TTS
log.info("Loading tts")
import tts.tts as tts
tts.setup(robot_config)
# Connect to the networking sockets
if not test_mode:
log.info("Loading networking")
networking.setupWebSocket(robot_config, handle_message)
# If custom hardware extensions have been enabled, load them if they exist. Otherwise load the default
# controller for the specified hardware type.
log.info("Loading hardware module")
log.debug("Loading module hardware/%s", commandArgs.type)
if commandArgs.custom_hardware:
if os.path.exists('hardware/hardware_custom.py'):
if (sys.version_info > (3, 0)):
module = importlib.import_module('hardware.hardware_custom')
else:
module = __import__('hardware.hardware_custom', fromlist=['hardware_custom'])
else:
log.warning("Unable to find hardware/hardware_custom.py")
if (sys.version_info > (3, 0)):
module = importlib.import_module('hardware.'+commandArgs.type)
else:
module = __import__("hardware."+commandArgs.type, fromlist=[commandArgs.type])
else:
if (sys.version_info > (3, 0)):
module = importlib.import_module('hardware.'+commandArgs.type)
else:
module = __import__("hardware."+commandArgs.type, fromlist=[commandArgs.type])
#call the hardware module setup function
module.setup(robot_config)
move_handler = module.move
# Load the video handler
log.info("Loading video module")
log.debug("Loading module video/%s", commandArgs.video)
if commandArgs.custom_video:
if os.path.exists('video/video_custom.py'):
if (sys.version_info > (3, 0)):
video_module = importlib.import_module('video.video_custom')
else:
video_module = __import__('video.video_custom', fromlist=['video_custom'])
else:
log.warning("Unable to find video/video_custom.py")
if (sys.version_info > (3, 0)):
video_module = importlib.import_module('video.'+commandArgs.video)
else:
video_module = __import__("video."+commandArgs.video, fromlist=[commandArgs.video])
else:
if (sys.version_info > (3, 0)):
video_module = importlib.import_module('video.'+commandArgs.video)
else:
video_module = __import__("video."+commandArgs.video, fromlist=[commandArgs.video])
# Setup the video encoding
video_module.setup(robot_config)
if not test_mode:
video_module.start()
#load the extended chat commands
if ext_chat:
log.info("Loading extended chat commands")
extended_command.setup(robot_config)
extended_command.move_handler=move_handler
move_handler = extended_command.move_auth
extended_command.add_command('.restart', restart_controller)
# Load a custom chat handler if enabled and exists
if commandArgs.custom_chat:
if os.path.exists('chat_custom.py'):
if (sys.version_info > (3, 0)):
chat_module = importlib.import_module('chat_custom')
else:
chat_module = __import__('chat_custom', fromlist=['chat_custom'])
chat_module.setup(robot_config, handle_chat_message)
else:
log.warning("Unable to find chat_custom.py")
atexit.register(log.debug, "Attempting to clean up and exit nicely")
if not test_mode:
log.critical('RemoTV Controller Started')
while not terminate.locked():
time.sleep(1)
watchdog.watch()
log.critical('RemoTV Controller Exiting')
else:
log.critical('RemoTV Controller Test Complete')
sys.exit()
Comments