Contents

Streamr Network - Monitoring Home Electricity Consumption

👉Consumption DAO Website👈

1. Problem Statement (Individual)

🏘️ As a Homeowner… How much Electrictricity should my house be consuming?

2. Problem Statement (Macro)

🌎 Electricity is the fastest-growing source of final energy demand

🔌 Over the next 25 years its growth is set to outpace energy consumption as a whole

3. Solution

💡 Software + Hardware combination that enables Complete Home Energy Monitoring with incentives.

4. Incentives

4.1 Monetary

By pairing the project with a green web3 crypto, homeowners could earn ‘crypto’

4.2 Reconcilation Mechanism

/posts/streamr_mqtt/Checks_and_Balances.png

4.3 Architecture

Raspi –> NodeRed –> Streamr –> InfluxDB –> Grafana

5. Hardware

5.1 BOM

  • Raspberry Pi 4
  • CT Sensors
  • RPICT Series 5

6. Software

6.1 Python Code

from dotenv import load_dotenv
import os
from datetime import datetime
import time
import json
import numpy as np
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import paho.mqtt.client as mqtt_client
import serial

# load environment variables
load_dotenv()

streamr_url = os.getenv("streamr_url")
streamr_port = os.getenv("streamr_port")
streamID = os.getenv("streamID")
streamr_api_key = os.getenv("streamr_api_key")
streamr_privatekey = os.getenv("streamr_privatekey")
flux_url = os.getenv("flux_url")
flux_port = os.getenv("flux_port")
flux_token = os.getenv("flux_token")
flux_org = os.getenv("flux_org")
flux_bucket = os.getenv("flux_bucket")

streamr_broker = streamr_url + ":" + streamr_port
flux_instance = flux_url + ":" + flux_port

channels=[
  'vref','rp1','rp2','rp3','rp4','rp5','rp6','rp7','rp8','rp9','rp10','rp11','rp12', 
  'rp13','rp14','rp15','rp16','rp17','rp18','rp19','rp20','rp21','rp22','rp23','rp24',
  'rp25','rp26','rp27','rp28','rp29','rp30','rp31','rp32'
]

#-----serial input-----
ser = serial.Serial('/dev/ttyAMA0', 38400)

#-----read and format serial-----
def serial(ser):
    line = ser.readline()
    Z = str(line, "ascii").replace(' ',',')
    Z = Z[:-2]
    Z = Z.split(",")
    return Z

# -----connect to streamr broker-----
# def connect_streamr():
#     def on_connect(streamr_client, userdata, flags, rc):
#         if rc == 0:
#             print("Connected to MQTT Broker!")
#         else:
#             print("Failed to connect, return code %d\n", rc)

#     streamr_client = mqtt_client.Client(streamr_privatekey)
#     streamr_client.on_connect = on_connect
#     streamr_client.username_pw_set("", streamr_api_key)
#     streamr_client.connect(broker, port)
#     return streamr_client

#-----connect to influxdb instance-----
def connect_influxdb(): 
    flux_client = InfluxDBClient(url=flux_instance, token=flux_token, org=flux_org)
    write_api = flux_client.write_api(write_options=SYNCHRONOUS)
    return write_api

#-----publish to influxbd instance-----
def publish_flux(write_api, serial_read):
    flux_data = ['consumption,loc={} power={}'.format(*t) for t in zip(channels, serial_read)]
    write_api.write(flux_bucket, flux_org, flux_data)

#-----publish to streamr-----
def publish_mqtt(streamr_client, serial_read):
    dic = {channels[i]: serial_read[i] for i in range(len(serial_read))}
    msg = json.dumps(dic, indent = 4)
    result = streamr_client.publish(streamID, msg)
    status = result[0]
    if status == 0:
        print(f"Success - `{msg}` to topic `{streamID}`")
    else:
        print(f"Failed to send message to topic {streamID}")


write_api = connect_influxdb()
serial_read = serial(ser)
publish_flux(write_api, serial_read)



#def run():
#   influxdb
#    write_api = connect_influxdb()
#    serial_read = query()
#    publish_flux(write_api, serial_real)
#   streamer
    # client = connect_streamr()
    # client.loop_start()
    # publish(client)


# def publish_mqtt(client):
#     while True:
#         time.sleep(1)
#         line = ser.readline()
#         Z = str(line, "ascii").replace(' ',',')
#         Z = Z[:-2]
#         Z = Z.split(",")
#         Z = [str(Z[i]) for i in range(len(Z))]
#         dic = {channels[i]: Z[i] for i in range(len(Z))}
#         msg = json.dumps(dic, indent = 4)
#         result = client.publish(streamID, msg)
#         status = result[0]
#         if status == 0:
#             print(f"Success - `{msg}` to topic `{streamID}`")
#         else:
#             print(f"Failed to send message to topic {streamID}")