import sys
from random import random
import os
import plotly.graph_objects as go
import pandas as pd
from optihood.plot_functions import getData
import numpy as np
from optihood.labelDict import labelDictGenerator, positionDictGenerator
from matplotlib import colors
[docs]def addCapacities(nodes, dataDict, buildings, UseLabelDict, labelDict, mergedLinks):
capacities = ["sufficient"] * len(nodes)
for i in buildings:
capTransformers=dataDict["capTransformers__Building"+str(i)]
capStorages=dataDict["capStorages__Building"+str(i)]
for j, k in capStorages.iterrows():
if k[0] < 0.01: # if the installed capacity is 0 then skip (sometimes as an error very low capacites are selected. To handle this k<0.01kW is set as the condition for comparison)
continue
index=nodes.index(labelDict[j])
#nodes[index]=nodes[index]+" "+str(round(k[0],2))+" kWh"
if "Bat" in labelDict[j]:
capacities[index] = str(round(k[0], 1)) + " kWh"
else:
capacities[index] = str(round(k[0], 1)) + " L"
for j, k in capTransformers.iterrows():
if k[0] < 0.001: # if the installed capacity is 0 then skip (sometimes as an error very low capacites are selected. To handle this k<0.001kW is set as the condition for comparison
continue
jComponents=j.split("'")
if 'Bus' in jComponents[1]:
j =jComponents[3]
else:
j=jComponents[1]
index=nodes.index(labelDict[j])
#nodes[index]=nodes[index]+" "+str(k[0])+" kW"
if round(k[0],1) == 0:
capacities[index] = str(round(k[0], 2)) + " kW"
else:
capacities[index] = str(round(k[0], 1)) + " kW"
return capacities
[docs]def readResults(fileName, buildings, ColorDict, UseLabelDict, labelDict, positionDict, labels, mergedLinks):
dataDict = getData(fileName)
keys=dataDict.keys()
nodes, sources, targets, values,x,y = createSankeyData(dataDict, keys, UseLabelDict, labelDict, positionDict, buildings, mergedLinks)
capacities = addCapacities(nodes, dataDict, buildings, UseLabelDict, labelDict, mergedLinks)
nodesColors = pd.Series(createColorList(nodes, ColorDict, labels))
linksColors = nodesColors[sources]
linksColors = np.where(nodesColors[targets] == ColorDict["dhw"], ColorDict["dhw"], linksColors)
linksColors = np.where(nodesColors[targets] == ColorDict["sh"], ColorDict["sh"], linksColors)
linksColors = np.where(nodesColors[targets] == ColorDict["elec"], ColorDict["elec"], linksColors)
data = [go.Sankey(
arrangement="snap",
valuesuffix="kWh",
node={
"pad":25,
"thickness":15,
"line":dict(color="black", width=0.5),
"label":nodes,#+str(values),
"color":nodesColors.tolist(),
#"groups":[linkGroup],
"customdata": capacities,
"hovertemplate": '%{label} has %{customdata} capacity installed',
"x":x,
"y":y,
},
link={
"source":sources,
"target":targets,
"value":values,
"color":linksColors.tolist(),
},
)]
return data
[docs]def createSankeyData(dataDict, keys, UseLabelDict, labelDict, PositionDict, buildings=[], mergedLinks=False):
sources = [] #contains index of node
targets = [] #contains index of node
values = []
nodes = []
# (x,y) is the position of the node
x=[] #equivalent in dimension to nodes
y=[] #equivalent in dimension to nodes
linkGroup=[]
mergedComponents = ["electricityBus", "electricityInBus", "domesticHotWaterBus", "dhwDemandBus", "spaceHeatingBus", "shDemandBus", "excesselectricityBus", "producedElectricity", "spaceHeating", "domesticHotWater"]
for key in keys:
df = dataDict[key]
dfKeys = df.keys()
if "dhwStorageBus" in key and not mergedComponents:
continue
if all([str(i) not in key for i in buildings]) and key not in mergedComponents:
continue
for dfKey in dfKeys:
if isinstance(dfKey, int) or "storage_content" in dfKey:
continue
dfKeySplit = dfKey.split("'")
sourceNodeName=dfKeySplit[1]
targetNodeName =dfKeySplit[3]
if mergedLinks:
# for the sake for representation the merged buses (if present) are added to Building 1
if sourceNodeName in mergedComponents:
sourceNodeName = sourceNodeName + '__Building1'
if targetNodeName in mergedComponents:
targetNodeName = targetNodeName + '__Building1'
if sourceNodeName in labelDict:
sourceNodeName = labelDict[sourceNodeName]
if targetNodeName in labelDict:
targetNodeName = labelDict[targetNodeName]
if sourceNodeName == targetNodeName:
continue
if "exSolar" in targetNodeName:
continue
if "Resource" not in sourceNodeName:
dfKeyValues = df[dfKey].values
value = sum(dfKeyValues)
if value < 0.001:
continue
values.append(value)
if sourceNodeName not in nodes:
nodes.append(sourceNodeName)
for posKey in PositionDict.keys():
if posKey in sourceNodeName and posKey[0:2] == sourceNodeName[0:2]: #second part of the term added for CHP and HP
x.append(PositionDict[posKey][0])
if labelDict["electricityLink"] in sourceNodeName:
y.append((0.5-(PositionDict[posKey][1]))/len(buildings))
elif labelDict["shLink"] in sourceNodeName:
y.append((0.5 - (PositionDict[posKey][1])) / len(buildings))
elif labelDict["dhwLink"] in sourceNodeName:
y.append((0.5 - (PositionDict[posKey][1])) / len(buildings))
elif ("grid" in sourceNodeName or "Grid" in sourceNodeName) and mergedLinks:
buildingNumber = 1
temp = (PositionDict[posKey][1]) / len(buildings) + buildingNumber / len(buildings)
y.append(temp)
else:
buildingNumber=buildings.index(int(sourceNodeName.split('_')[-1][1:]))
temp = (PositionDict[posKey][1]) / len(buildings) + buildingNumber / len(buildings)
y.append(temp)
sources.append(nodes.index(sourceNodeName))
if targetNodeName not in nodes:
nodes.append(targetNodeName)
for posKey in PositionDict.keys():
if posKey in targetNodeName and posKey[0:2] == targetNodeName[0:2]:
x.append(PositionDict[posKey][0])
if labelDict["electricityLink"] in targetNodeName:
y.append((0.5-(PositionDict[posKey][1]))/len(buildings))
elif labelDict["shLink"] in targetNodeName:
y.append((0.5 - (PositionDict[posKey][1])) / len(buildings))
elif labelDict["dhwLink"] in targetNodeName:
y.append((0.5 - (PositionDict[posKey][1])) / len(buildings))
elif ("grid" in sourceNodeName or "Grid" in sourceNodeName) and mergedLinks:
buildingNumber = 1
temp = (PositionDict[posKey][1]) / len(buildings) + buildingNumber / len(buildings)
y.append(temp)
else:
buildingNumber=buildings.index(int(targetNodeName.split('_')[-1][1:]))
temp = (PositionDict[posKey][1]) / len(buildings) + (buildingNumber) / len(buildings)
y.append(temp)
targets.append(nodes.index(targetNodeName))
return nodes, sources, targets, values, x, y
[docs]def createColorList(inputList, ColorDict, labels):
colorsList=[]
for n in inputList:
if (labels!='default' and labels["naturalGas"] in n) or (labels=='default' and "natGas" in n): # Check for whether labels of a specific type are defined or not should be added here
color = ColorDict["gas"]
elif (labels!='default' and (labels["excessSh"] in n or labels["prodSH"] in n or labels["shBus"] in n or labels["StorageSh"] in n or labels["DemandSh"] in n)) or (labels=='default' and any(v in n for v in ["shLink", "prodSH", "shBus", "shStor", "Q_sh", "exSh", "usedSH"])):
color = ColorDict["sh"]
elif (labels!='default' and (labels["solarCollector"] in n or labels["excessSolarCollector"] in n or labels["StorageDhw"] in n or labels["dhwBus"] in n or labels["DemandDhw"] in n)) or (labels=='default' and any(v in n for v in ["dhwLink", "solar", "exSolar", "dhwStor", "dhwBus", "Q_dhw", "prodDHW"])):
color = ColorDict["dhw"]
elif (labels!='default' and (labels["elBus"] in n or labels["grid"] in n or labels["pv"] in n or labels["prodEl"] in n or labels["localEl"] in n or labels["StorageEl"] in n or labels["excessEl"] in n or labels["DemandEl"] in n or labels["DemandMob"] in n)) or (labels=='default' and any(v in n for v in ["elLink", "grid", "pv", "prodEl", "localEl", "Bat", "exEl", "usedEl", "Q_el", "Q_mob"])):
color = ColorDict["elec"]
else:
color = ColorDict["other"]
colorsList.append(color)
return colorsList
[docs]def displaySankey(fileName, UseLabelDict, labelDict, positionDict, labels, buildings, mergedLinks, hideBuildingNumber):
OPACITY = 0.6
ColorDict = {"elec": 'rgba' + str(colors.to_rgba("skyblue", OPACITY)),
"gas": 'rgba' + str(colors.to_rgba("darkgray", OPACITY)),
"dhw": 'rgba' + str(colors.to_rgba("red", OPACITY)),
"sh": 'rgba' + str(colors.to_rgba("magenta", OPACITY)),
"other": 'rgba' + str(colors.to_rgba("deeppink", OPACITY))
}
data = readResults(fileName, buildings, ColorDict, UseLabelDict, labelDict, positionDict, labels, mergedLinks)
node = data[0]['node']
link = data[0]['link']
if hideBuildingNumber == True:
node['label'] = tuple(s if not "_B" in s else s.split("_")[0] for s in node['label'])
fig = go.Figure(go.Sankey(arrangement = "perpendicular",
link=link,
node=node
)) #snap, perpendicular,freeform, fixed
fig.update_layout(
title=fileName +" for buildings " + str(buildings),
font=dict(size=10, color='black'),
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
)
fig.add_hline(y=0, line_color='rgba(0,0,0,0)')
if len(buildings)%2==0:
fig.add_hline(y=0.5, line_dash="dash")
if len(buildings)%3==0:
fig.add_hline(y=0.33, line_dash="dash")
fig.add_hline(y=0.66, line_dash="dash")
if len(buildings)%4==0:
fig.add_hline(y=0.25, line_dash="dash")
fig.add_hline(y=0.75, line_dash="dash")
fig.add_hline(y=1.0, line_color='rgba(0,0,0,0)')
fig.update_xaxes(visible=False)
fig.update_yaxes(visible=False)
return fig
[docs]def plot(excelFileName, outputFileName, numberOfBuildings, UseLabelDict, labels, optimType, mergedLinks=False, hideBuildingNumber=False):
BUILDINGSLIST = list(range(1, numberOfBuildings + 1))
labelDict = labelDictGenerator(numberOfBuildings, labels, optimType, mergedLinks)
positionDict = positionDictGenerator(labels, optimType, mergedLinks)
fig = displaySankey(excelFileName, UseLabelDict, labelDict, positionDict, labels, BUILDINGSLIST, mergedLinks, hideBuildingNumber)
fig.show()
fig.write_html(outputFileName)
if __name__ == "__main__":
optMode = "group" # parameter defining whether the results file corresponds to "indiv" or "group" optimization
numberOfBuildings = 4
plotOptim = 3 # defines the number of the optimization to plot
UseLabelDict = True
excelFileName = os.path.join(r"..\data\Results", f"results{numberOfBuildings}_{plotOptim}_{optMode}.xlsx")
outputFileName = os.path.join(r"..\data\figures", f"Sankey_{plotOptim}.html")
plot(excelFileName, outputFileName, numberOfBuildings, UseLabelDict, labels='default', optimType=optMode)