#####################################################################
# DOC #
#####################################################################
"""
@author: F. Ramognino <federico.ramognino@polimi.it>
Last update: 12/06/2023
"""
#####################################################################
# IMPORT #
#####################################################################
#Typing
from __future__ import annotations
from types import FunctionType
from typing import Iterable, Callable
from matplotlib.axes import Axes
#Other
from operator import attrgetter
import os
from tqdm import tqdm
import traceback
#Plotting
import numpy as np
import pandas as pd
import libICEpost as lice
#Base class
from libICEpost.src.base.BaseClass import BaseClass
from libICEpost.src.base.dataStructures.EngineData.EngineData import EngineData
#I/O and pre-processing
from libICEpost.src.base.dataStructures.Dictionary import Dictionary
from libICEpost.src.base.Filter.Filter import Filter
#Submodels
from libICEpost.src.engineModel.EngineTime.EngineTime import EngineTime
from libICEpost.src.engineModel.EngineGeometry.EngineGeometry import EngineGeometry
from libICEpost.src.thermophysicalModels.thermoModels.thermoMixture.ThermoMixture import ThermoMixture
from libICEpost.src.thermophysicalModels.thermoModels.ThermoModel import ThermoModel
from libICEpost.src.thermophysicalModels.thermoModels.CombustionModel.CombustionModel import CombustionModel
from libICEpost.src.thermophysicalModels.thermoModels.EgrModel.EgrModel import EgrModel
from libICEpost.src.engineModel.HeatTransferModel.HeatTransferModel import HeatTransferModel
from libICEpost.src.thermophysicalModels.thermoModels.CombustionModel.NoCombustion import NoCombustion
from libICEpost.src.engineModel.HeatTransferModel.Woschni import Woschni
#Database
from libICEpost.Database.chemistry.specie.Mixtures import Mixtures, Mixture
#Errors
from libICEpost.src.base.Functions.runtimeWarning import helpOnFail
#############################################################################
# FUNCTIONS #
#############################################################################
[docs]
def get_postfix(zone:str) -> str:
"""
Return the postfix to append to a corresponding zone. By default, no postfix corresponds to 'cylinder' zone, while '_<zone>' to all the others.
Args:
zone (str): The zone to which compute the postfix to append to the data of that zone.
Returns:
str: The postfix
"""
return "" if zone == "cylinder" else f"_{zone}"
#############################################################################
# MAIN CLASSES #
#############################################################################
# TODO:
# Handle direct injection (injectionModel?)
# Handle interaction with other zones (creviceModel? prechamberModel?)
# NOTE: to handle diesel combustion, need to compute the phi from the injected mass
# (probably the main parameter for the combustion model)
# NOTE: This model handles a single-zone model of the cylinder. Sub-classes may be
# defined to introduce additional zones, like in case of pre-chamber engines, crevice
# modeling, or maybe gas-exchange analysis (ducts)
[docs]
class EngineModel(BaseClass):
"""
Base class for modeling of an engine and processing experimental/numerical data
NOTE:
For naming of variables:
-> By default they refer to the "cylinder" zone
-> Variables referred to a specific zone are allocated as "<variableName>_<zoneName>"
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
TODO: documentation and user info
"""
#Attibutes:
_cylinder:ThermoModel
"""Cylinder thermodynamic region"""
Types:dict[str:type] = \
{
"EngineGeometry": EngineGeometry,
"EngineTime": EngineTime,
"EgrModel": EgrModel,
"CombustionModel": CombustionModel,
"HeatTransferModel": HeatTransferModel,
}
"""Types for each main model"""
Submodels:dict[str:type] = \
{
"EgrModel": EgrModel(), # 0% EGR
"CombustionModel": NoCombustion(reactants=Mixture.empty()), #Inhert (motoring)
"HeatTransferModel": Woschni(), # Woschni model with default coeffs.
}
"""The available sub-models and their default initializers"""
Zones:list[str] = \
[
"cylinder"
]
"""The zones avaliable in the model"""
thermophysicalProperties:Dictionary
"""Dictionary with thermophysical properties for mixtures"""
combustionProperties:Dictionary
"""Dictionary with properties for combustion modeling and chemical composition of mixtures"""
CombustionModel:CombustionModel
"""The combustion model"""
EgrModel:EgrModel
"""The EGR model"""
HeatTransferModel:HeatTransferModel
"""The wall heat transfer model"""
_air:Mixture
"""Air mixture"""
info:Dictionary
"""General information for pre-post processing"""
#########################################################################
# Properties
@property
def raw(self)-> EngineData:
"""
The raw data
Returns:
EngineData
"""
return self._raw
#################################
@property
def data(self)-> EngineData:
"""
The processed/filtered data
Returns:
EngineData
"""
return self._data
#########################################################################
# Class methods
[docs]
@classmethod
def fromDictionary(cls, dictionary:Dictionary) -> EngineModel:
"""
Construct from dictionary like:
{
EngineTime: str
Name of the EngineTime model to use
<EngineTime>Dict: dict
Dictionary containing the data specific of the selected
SngineTime model (e.g., if engineTime is 'SparkIgnitionTime',
then this dictionary must be named 'SparkIgnitionTimeDict').
See at the helper for function 'fromDictionary' of the specific
EngineTime model selected.
EngineGeometry: str
Name of the EngineGeometry model to use
<EngineGeometry>Dict: dict
Dictionary with data required from engineGeometry.
See at the helper for function 'fromDictionary' of the specific
EngineGeometry model selected.
thermoPhysicalProperties: dict
Dictionary with types and data for thermophysical modeling of mixtures
{
ThermoType: dict
{
Thermo: str
EquationOfState: str
}
<Thermo>Dict: dict
<EquationOfState>Dict: dict
}
combustionProperties: dict
Dictionaries for data required for mixture preparation and combustion modeling.
{
injectionModels: dict
{
TODO
},
air: Mixture (default: database.chemistry.specie.Mixtures.dryAir)
The air mixture composition
initialMixture: dict
Dictionary with data for initialization of the mixture
in the thermodynamic zones
{
<zoneName>:
{
[depends on specific engine model]
}
},
CombustionModel: str
Name of the CombustionModel to use
<CombustionModel>Dict: dict
Dictionary with data required from CombustionModel
See at the helper for function 'fromDictionary' of the specific
CombustionModel model selected.
}
dataDict (dictionary): Dictionary with info for loading data, pre-processing and setting initial conditions.
{
TODO
}
functionObjects (Iterable[tuple[str,dict]]): Function object to apply at every time-loop
[
tuple[str, dict]: FunctionObjectType, constructionDictionary
]
}
"""
cls.checkType(dictionary, [dict, Dictionary], "dictionary")
if isinstance(dictionary, dict):
dictionary = Dictionary(dictionary)
print("Constructing engine model from dictionary\n")
#Engine time:
print("Construct EngineTime")
etModel = dictionary.lookup("EngineTime")
ET = EngineTime.selector(etModel, dictionary.lookup(etModel + "Dict"))
print(ET,"\n")
#EngineGeometry:
print("Construct EngineGeometry")
egModel = dictionary.lookup("EngineGeometry")
EG = EngineGeometry.selector(egModel, dictionary.lookup(egModel + "Dict"))
print(EG,"\n")
#combustionProperties
combustionProperties = dictionary.lookup("combustionProperties")
#thermophysical properties
thermophysicalProperties = dictionary.lookup("thermophysicalProperties")
#Data for pre-processing
dataDict = dictionary.lookup("dataDict")
#Submodels
subModels = {}
smDict = dictionary.lookupOrDefault("submodels", Dictionary())
for sm in cls.Submodels:
if sm + "Type" in smDict:
print(f"Constructing {sm} sub-model")
smTypeName = smDict.lookup(sm + "Type")
print(f"\tType: {smTypeName}")
subModels[sm] = cls.Types[sm].selector(smTypeName, smDict.lookup(smTypeName + "Dict"))
#Function objects
from libICEpost.src.engineModel.functionObjects import FunctionObject
functionObjects = []
foList = dictionary.lookupOrDefault("functionObjects", default=[])
for ii, fo in enumerate(foList):
cls.checkType(fo, tuple, "dictionary[functionObjects][{ii}]")
if not len(fo) == 2: raise ValueError(f"dictionary[functionObjects][{ii}] must be of length 2 with FunctionObjectType and corresponding construction dictionary")
cls.checkType(fo[0], str, f"dictionary[functionObjects][{ii}][0]")
cls.checkType(fo[1], dict, f"dictionary[functionObjects][{ii}][1]")
functionObjects.append(FunctionObject.selector(*fo))
out = cls(time=ET, geometry=EG, thermophysicalProperties=thermophysicalProperties, combustionProperties=combustionProperties, dataDict=dataDict, functionObjects=functionObjects, **subModels)
return out
#########################################################################
#Constructor:
def __init__(self, *,
time:EngineTime,
geometry:EngineGeometry,
thermophysicalProperties:dict|Dictionary,
combustionProperties:dict|Dictionary,
dataDict:dict=None,
functionObjects:Iterable[Callable]=None,
**submodels,
):
"""
Base class for engine model, used for type-checking and loading the sub-models.
Args:
time (EngineTime): The engine time
geometry (EngineGeometry): The engine geometry
thermophysicalProperties (dict|Dictionary): Dictionary with thermophysical properties of mixtures
combustionProperties (dict|Dictionary): Dictionary with combustion data and chemical composition
dataDict (dict, optional): Dictionary for loading data. If not given, data are not loaded
and thermodynamic regions not initialized. Defaults to None.
functionObjects (Iterable[Callable], optional): A list of function objects to apply at
every time-step. They must be functions takining the EngineModel (self) as *ONLY* input and
returning None. These can be used to further post-processing (example, save the mixture
compositions of the thermodynamic models)
**submodels (dict, optional): Optional sub-models to load. Defaults to {}.
"""
#Main models
self.checkType(geometry, self.Types["EngineGeometry"], "geometry")
self.checkType(time, self.Types["EngineTime"], "engineTime")
self.geometry = geometry
self.time = time
#Data structures
self._raw = EngineData() #Raw data
self._data = EngineData() #Filtered data
#Submodels
for model in self.Submodels:
if model in submodels:
#Get from input
sm = submodels[model]
self.checkType(sm, self.Types[model], f"{submodels}[{model}]")
else:
#Take default
sm = self.Submodels[model].copy()
#Set sub-model
self.__setattr__(model, sm)
#Thermos
self.checkType(thermophysicalProperties, dict, "thermophysicalProperties")
if isinstance(thermophysicalProperties, dict):
thermophysicalProperties = Dictionary(**thermophysicalProperties)
self.thermophysicalProperties = thermophysicalProperties.copy()
#Combustion properties
self.checkType(combustionProperties, dict, "combustionProperties")
if isinstance(combustionProperties, dict):
combustionProperties = Dictionary(**combustionProperties)
self.combustionProperties = combustionProperties.copy()
#Contruct the thermodynamic models
self._constructThemodynamicModels(combustionProperties)
#TODO: construct injection models
#Construct the Egr model
self._constructEgrModel(combustionProperties)
#Construct the combustion model
self._constructCombustionModel(combustionProperties)
#Misc parameters
self.info = Dictionary()
self.info["path"] = None
self.info["dataDict"] = None
self.info["filter"] = None
self.info["initialConditions"] = None
#Function objects
self.checkArray(functionObjects, Callable, "functionObjects")
self.info["functionObjects"] = [fo for fo in functionObjects]
#Pre-processing
if not dataDict is None:
self.preProcess(**dataDict)
#########################################################################
#Construction methods:
[docs]
def _constructThemodynamicModels(self, combustionProperties:dict|Dictionary) -> EngineModel:
"""
Construct the thermodynamic models of the system, setting their initial
mixture composition. Here setting everything to air, might be overwritten
in sub-classes to handle specific initializations (SI engine will use
the premixedFuel entry)
Args:
combustionProperties (dict|Dictionary): the combustion properties
Returns:
EngineModel: self
"""
self.checkType(combustionProperties, dict, "combustionProperties")
if not isinstance(combustionProperties, Dictionary):
combustionProperties = Dictionary(**combustionProperties)
#Air composition
air = combustionProperties.lookupOrDefault("air", Mixtures.dryAir)
self._air = air.copy()
#Here set everything to air, in sub-classes update
for zone in self.Zones:
self.__setattr__("_" + zone, ThermoModel(ThermoMixture(self._air.copy(), **self.thermophysicalProperties)))
return self
####################################
[docs]
def _constructEgrModel(self, combustionProperties:dict|Dictionary):
"""
Construct the EGR model and apply it to the cylinder region.
Might be overwritten in child for applying EGR also to sub-regions of cylinder.
Args:
combustionProperties (dict|Dictionary): the combustion properties
Returns:
EngineModel: self
"""
print("Constructing EGR model")
self.checkType(combustionProperties, dict, "combustionProperties")
if not isinstance(combustionProperties, Dictionary):
combustionProperties = Dictionary(**combustionProperties)
if "EgrModel" in combustionProperties:
#Construct egr model from combustion properties
egrModelType:str = combustionProperties.lookup("EgrModel")
egrModelDict = combustionProperties.lookupOrDefault(egrModelType + "Dict", Dictionary())
egrModelDict.update(reactants=self._cylinder.mixture.mix) #Append to dictionary the cylinder properties
else:
#Use default
self.EgrModel = self.Submodels["EgrModel"].copy().update(reactants=self._cylinder.mixture.mix)
#NOTE: When introducing the injection models, need to compute EVO composition instead
#Construct the EGR model
self.EgrModel = EgrModel.selector(egrModelType, egrModelDict)
print(f"\tType: {self.EgrModel.__class__.__name__}")
#Apply EGR to reactants
self._cylinder.mixture.mix.dilute(self.EgrModel.EgrMixture, self.EgrModel.egr)
####################################
[docs]
def _constructCombustionModel(self, combustionProperties:dict|Dictionary):
"""
Construct the combustion model.
Might be overwritten in child to set additional parameters to combustionModelDict
(fuel for PremixedCombustion model is updated in SparkIgnitionEngine)
Args:
combustionProperties (dict|Dictionary): the combustion properties
Returns:
EngineModel: self
"""
print("Constructing combustion model")
self.checkType(combustionProperties, dict, "combustionProperties")
if not isinstance(combustionProperties, Dictionary):
combustionProperties = Dictionary(**combustionProperties)
combustionModelType = combustionProperties.lookupOrDefault("CombustionModel", None, fatal=False)
if not combustionModelType is None:
#Get dictionary
combustionModelDict = combustionProperties.lookupOrDefault(combustionModelType + "Dict", Dictionary())
combustionModelDict.update( #Append reactants to combustion model dictionary
reactants=self._cylinder.mixture.mix #Initial in-cylinder mixture
)
#Construct
self.CombustionModel = CombustionModel.selector(combustionModelType, combustionModelDict)
else:
#Use default
self.CombustionModel = self.Submodels["CombustionModel"].copy().update(reactants=self._cylinder.mixture.mix)
print(f"\tType: {self.CombustionModel.__class__.__name__}")
#Check consistency
if not isinstance(self.CombustionModel, self.Types["CombustionModel"]):
raise TypeError(f"Combustion model type {combustionModelType} in combustionProperties dictionaries not compatible with allowed type for engine model {self.__class__.__name__} ({self.Types['CombustionModel']})")
#########################################################################
#Updating methods:
[docs]
def _updateMixtures(self) -> None:
"""
Update mixture compositions (might be overwritten in child classes)
"""
#TODO: Update the in-cylinder mixture based on injection models (may have already injected some mass)
#Update the combustion model at current time
self._updateCombustionModel()
#Update in-cylinder mixture based on combustion model current mixture
self._cylinder.mixture.update(self.CombustionModel.mixture)
####################################
[docs]
def _updateCombustionModel(self):
"""
Update combustion model
"""
#TODO: Update the reactants mixture based on injection models (may have already injected some mass)
#All data
index = self.data.index[self.data.loc[:,'CA'] == self.time.time].tolist()[0]
data = self.data.loc[index].to_dict()
self.CombustionModel.update(**data) #NOTE: update also fuel when implementing injection models
#########################################################################
#Dunder methods:
[docs]
def __str__(self):
STR = ""
STR += "Engine model instance:\n"
STR += "Engine time\n\t" + self.time.__str__().replace("\n", "\n\t")
STR += "\n"
STR += "Engine geometry:\n\t" + self.geometry.__str__().replace("\n", "\n\t")
STR += "\n"
STR += "EGR model:\n\t" + self.EgrModel.__str__().replace("\n", "\n\t")
STR += "\n"
STR += "Combustion model:\n\t" + self.CombustionModel.__str__().replace("\n", "\n\t")
return STR
#########################################################################
#Pre-processing methods:
[docs]
def _loadFile(self,*args,**argv) -> EngineModel:
"""
Loads a file with raw data to self.raw. See EngineData.loadFile
documentation for arguments:
"""
self.raw.loadFile(*args,**argv)
return self
####################################
[docs]
def _loadArray(self,*args,**argv):
"""
Loads an array with raw data to self.raw. See EngineData.loadArray
documentation for arguments:
"""
self._raw.loadArray(*args,**argv)
return self
####################################
[docs]
def loadData(self, dataPath:str=None, *, data:dict|Dictionary) -> EngineModel:
"""
Load raw data.
TODO: Info
Args:
data (dict | Dictionary): Dictionary containing the data to load for each region.
dataPath (str, optional): Global path where to load/write data. Defaults to None.
Returns:
EngineModel: self
"""
print("Loading data")
print(f"Data path: {dataPath}")
#Seth path info
self.info["dataPath"] = dataPath
#Cast to Dictionary
data = Dictionary(**data)
self.info["data"] = data
#Load data:
for zone in self.Zones:
zoneDict = data.lookup(zone)
#Check that pressure is found (mandatory)
if (not "p" in zoneDict) and (not "p" in self.raw.columns):
raise ValueError(f"Mandatory entry 'p' in data dictionary for zone {zone} not found. Pressure trace must be loaded for each thermodynamic region.")
#Loop over data to be loaded:
for entry in zoneDict:
dataDict = zoneDict.lookup(entry)
self.checkType(dataDict, Dictionary, f"{zone}[{entry}]")
#If the region is not cylinder, append its name to the field
entryName:str = entry + get_postfix(zone)
currData:Dictionary = dataDict.lookup("data")
opts:Dictionary = currData.lookupOrDefault("opts", Dictionary())
#Get format
dataFormat:str = dataDict.lookup("format")
if (dataFormat == "file"):
#File
fileName = currData.lookup("fileName")
#relative to dataPath if given
fileName = (dataPath + os.path.sep if dataPath else "") + fileName
#Load
self._loadFile(fileName, entryName, **opts)
elif (dataFormat == "array"):
#(CA,val) array
dataArray = currData.lookup("array")
self._loadArray(dataArray, entryName, **opts)
elif (dataFormat == "function"):
#Function f(CA)
function:FunctionType = currData.lookup("function")
self.checkType(function, FunctionType, f"{zone}[{entry}][function]")
CA = self.raw.loc[:,"CA"]
f = CA.apply(function)
self._loadArray(np.array((CA,f)).T, entryName, **opts)
elif (dataFormat == "uniform"):
#Uniform value
value:float = currData.lookup("value")
self.checkType(value, float, f"{zone}[{entry}][value]")
self.raw.loc[:,entryName] = value
elif (dataFormat == "calc"):
#Apply operation between alredy loaded data
function:FunctionType = currData.lookup("function")
self.checkType(function, FunctionType, f"{zone}[{entry}][function]")
#Function arguments
argNames:list[str] = function.__code__.co_varnames[:function.__code__.co_argcount]
#Check if are present:
for arg in argNames:
if not arg in self.raw.columns:
raise ValueError(f"Field '{arg}' was not loaded.")
#Extract corresponding columns from data-frame:
cols = {c:self.raw.loc[:,c] for c in argNames}
CA = self.raw.loc[:,"CA"]
f = function(**cols)
self._loadArray(np.array((CA,f)).T, entryName, **opts)
else:
raise ValueError(f"Unknown data format '{dataFormat}' for entry {zone}[{entry}]")
return self
####################################
[docs]
def filterData(self, filter:"Filter|FunctionType|None"=None) -> EngineModel:
"""
filter: Filter|FunctionType|None (optional)
Filter to apply to raw data (e.g. resampling, low-pass filter, etc.). Required
a method __call__(xp, yp)->(x,y) that resamples the dataset (xp,yp) to the
datapoints (x,y).
Filter the data in self.raw. Save the corresponding
filtered data to self.data.
If filter is None, data are cloned from self.raw
"""
#Save filter
self.info["filter"] = filter
#Clone if no filter is given
if filter is None:
for field in self._raw.columns:
self._data.loadArray(np.array((self._raw.loc[:,"CA"],self._raw.loc[:,field])).T, field)
return self
#Apply filter
print(f"Applying filter {filter if isinstance(filter,Filter) else filter.__name__}")
for var in self._raw.columns:
#Filter data
if var != "CA":
self._data.loadArray(np.array(filter(self._raw.loc[:,"CA"], self._raw.loc[:,var])).T, var)
return self
####################################
[docs]
def initializeThemodynamicModels(self, **initialConditions) -> EngineModel:
"""
Set the initial conditions of all thermodynamic regions of the EngineModel.
For region to be initialized, a dict is given for the inital conditions,
according to the following convention:
-> If a float is given, the value is used
-> If a str is given, it refers to the name of the vabiables stored in the EngineModel,
in which case the corresponding initial condition is sampled from the the corresponding
data-set at self.time.startTime.
-> If string starting with @ is given, it applies that method with input (self.time.startTime)
Ex:
{
"pressure": "p", #This interpolates self.data.p at self.time.startTime
"mass": 1.2e-3, #Value
"volume": "@geometry.V" #Evaluates self.geometry.V(self.time.startTime)
}
Args:
**initialConditions: data initialization of each zone in the model.
"""
#Update start-time of engine time so that it is bounded to first avaliable time-step
if not "CA" in self.data.columns:
raise ValueError("No data loaded yet.")
#Set start-time
self.time.updateStartTime(self.data.loc[:,"CA"])
self.info["time"] = self.time.time
#Update the mixtures at start-time (combustion models, injection models, etc.)
self._updateMixtures()
initialConditions = Dictionary(**initialConditions)
#Store initial conditions
self.info["initialConditions"] = initialConditions
for zone in self.Zones:
zoneDict = initialConditions.lookup(zone)
self.checkType(zoneDict, dict, "zoneDict")
attrgetter("_" + zone)(self).initializeState(**self._preprocessThermoModelInput(zoneDict, zone=zone))
return self
####################################
####################################
[docs]
def preProcess(self, dataPath:str=None, *, data:dict|Dictionary, preProcessing:dict|Dictionary=None, initialConditions:dict|Dictionary, **junk) -> EngineModel:
"""
Pre-processing:
1) Loading data (from files or arrays)
2) Pre-process the data (filtering, optional)
3) Initialize thermodynamic regions
NOTE:
Naming of variables when loading data as follows:
-> By default they refer to the "cylinder" zone
-> Variables referred to a specific zone are allocated as "<variableName>_<zoneName>"
TODO: example
Args:
data (dict | Dictionary): Dictionary with info for loading data.
preProcessing (dict | Dictionary, optional): Dictionary with pre-processing information. Defaults to None.
dataPath (str, optional): Master path of the tree. Defaults to None.
initialConditions (dict | Dictionary): Dictionary with initial condition for thermodynamic models
Returns:
EngineModel: self
"""
#NOTE: **junk used to have other miscellaneous during contruction from dictionary.
print("Pre-processing")
#Loading data:
self.loadData(dataPath, data=data)
# Filtering data
self.info["preProcessing"] = preProcessing
filter = None
if not preProcessing is None:
preProcessing = Dictionary(**preProcessing)
filterType = preProcessing.lookupOrDefault("Filter", None, fatal=False)
if isinstance(filterType, str):
#Got type name for run-time construction
filter:Filter = Filter.selector(filterType, preProcessing.lookup(f"{filterType}Dict"))
elif isinstance(filterType, Filter):
#Got filter item
filter = filterType
else:
#No filtering
pass
self.filterData(filter)
#Initial conditions for thermodinamic models:
self.initializeThemodynamicModels(**initialConditions)
return self
#########################################################################
#Processing methods:
[docs]
def process(self) -> EngineModel:
"""
Process the data, main time-loop.
This is split into two function calls, which may be overwritten in child classes to tailored processings:
1) _process__pre__: Create the columns in self.data for the fields generted by post-processing
2) _update: The state-updating procedure in the main time-loop
3) _process__post__: Final post-processing (e.g., computation of wall heat fluxes and rohr)
Returns:
EngineModel: self
"""
print("")
print("Processing")
print("startTime:",self.time.startTime)
print("endTime:",self.time.endTime)
#Create fields
self._process__pre__()
#Process cylinder data
for t in tqdm(self.time(self.data.loc[:,"CA"]), "Progress: ", initial=0, total=(self.time.endTime-self.time.startTime), unit="CAD"): #With progress bar :)
self.info["time"] = t
#Break the time loop if the updating fails:
try:
self._update() #Update
self._storeLatestTime() #Save results at this time
[fo(self) for fo in self.info["functionObjects"]] #Compute function objects
except BaseException as err:
self.runtimeError(f"Failed updating engine model at time {t}",stack=True)
print(traceback.format_exc())
break
#Final updates (heat transfer, cumulatives, etc...)
self._process__post__()
return self
####################################
[docs]
def _process__pre__(self) -> None:
"""
Creation of the post-processed fields.
NOTE:
When overwriting, first call this method:
def _process__pre__(self) -> None:
super()._process__pre__()
...
"""
#Add fields to data:
fields = {"dpdCA", "AHRR", "ROHR", "A"}
for zone in self.Zones:
fields |= {v + get_postfix(zone) for v in getattr(self, f"_{zone}").state.__dict__}
for f in fields:
if not f in self.data.columns:
self.data.loc[:,f] = float("nan")
#Loop over zones to set mixture compositions
for zone in self.Zones:
postfix = get_postfix(zone)
Z:ThermoModel = getattr(self, f"_{zone}")
# #Specie
# for specie in Z.mixture.mix:
# self.data.loc[:,specie.specie.name + "_x" + postfix] = 0.0
# self.data.loc[:,specie.specie.name + "_y" + postfix] = 0.0
#Store at startTime
self._storeLatestTime()
####################################
[docs]
def _update(self) -> None:
"""
Method for updating the state during the time-loop. Here updating
cylinder as single-zone model without interaction with other regions.
Could be overwritten for more detailed models (e.g., two-zone SI model, TJI, etc.)
NOTE:
When overwriting, afterwards call this method:
def _update(self) -> None:
...
super()._update()
"""
#TODO injection models for mass end energy souce terms
#TODO heat transfer models for temperature (open systems only!)
#Current time
CA = self.time.time
index = self.data.index[self.data.loc[:,'CA'] == CA].tolist()[0]
#Update state
p = self.data.loc[index,"p"]
V = self.geometry.V(CA)
dpdCA = (self.data.loc[index,"p"] - self.data.loc[index-1,"p"])/self.time.deltaT
self._cylinder.update(pressure=p, volume=V)
self._updateMixtures()
#Apparent heat release rate [J/CA]
#Generalization to allow other EoS
T = self._cylinder.state.T
m = self._cylinder.state.m
TOld = self.data.loc[index-1,"T"]
pOld = self.data.loc[index-1,"p"]
mOld = self.data.loc[index-1,"m"]
#Apporximating Us derivative backwards in time
dUsdCA = (self._cylinder.mixture.us(p,T)*m - self._cylinder.mixture.us(pOld,TOld)*mOld)/self.time.deltaT
ahrr = dUsdCA + p*self.geometry.dVdCA(CA) #- dmIndCA*mixtureIn.hs(p,T) + dmOutdCA*mixtureOut.hs(p,T)
#Store main parameters
self.data.loc[index, "dpdCA"] = dpdCA
self.data.loc[index, "AHRR"] = ahrr
####################################
[docs]
def _storeLatestTime(self):
"""
Store all properties at latest time.
"""
#Current time
CA = self.time.time
index = self.data.index[self.data.loc[:,'CA'] == CA].tolist()
#Loop over zones
for zone in self.Zones:
postfix = get_postfix(zone)
Z:ThermoModel = getattr(self, f"_{zone}")
#Properties
self.data.loc[index, Z.state.__dict__.keys()] = [Z.state.__dict__[key] for key in Z.state.__dict__.keys()]
#Specie
# for specie in Z.mixture.mix:
# self.data.loc[index, specie.specie.name + "_x" + postfix] = specie.X
# self.data.loc[index, specie.specie.name + "_y" + postfix] = specie.Y
####################################
[docs]
def _process__post__(self) -> None:
"""
Computing wall heat fluxes and rohr
NOTE:
When overwriting, first call this method:
def _process__post__(self) -> None:
...
super()._process__post__()
"""
#WHF and ROHR
self._computeWallHeatFlux()
self.data.loc[:,"ROHR"] = self.data.loc[:,"AHRR"] + self.data.loc[:,"dQwalls"]
#Cumulatives
self.data.loc[:,"cumHR"] = self.cumulativeIntegral("ROHR")
self.data.loc[:,"cumAHR"] = self.cumulativeIntegral("AHRR")
####################################
[docs]
def _computeWallHeatFlux(self) -> None:
"""
Compute wall heat fluxes for each patch and global value in each region. Might be overloaded in child.
"""
areas = self.geometry.areas(self.data.loc[:,"CA"])
#Compute wall heat transfer coefficient:
h = self.HeatTransferModel.h(engine=self, CA=self.data.loc[:,"CA"])
self.data.loc[:,"heatTransferCoeff"] = h
#Total whf
self.data.loc[:,"dQwalls"] = 0.0
self.data.loc[:,"Qwalls"] = 0.0
self.data.loc[:,"wallsArea"] = 0.0
for patch in [c for c in areas.columns if not (c == "CA")]:
#Search temperature as "T<patchName>":
if f"T{patch}" in self.data.columns:
Twall = self.data.loc[:,f"T{patch}"]
#Fallback to default "Twalls":
elif "Twalls" in self.data.columns:
Twall = self.data.loc[:,"Twalls"]
else:
raise ValueError("Cannot compute wall heat flux. Either load patch temperatures in the form t<patchName> or default temperature Twalls to compute wall heat fluxes.")
#Compute patch area:
A = areas[patch]
self.data.loc[:,"wallsArea"] += A
name = patch + "Area"
if not name in self.data.columns:
self.data.loc[:,name] = A
#Compute wall heat flux at patch [converted to J/CA]:
self.data.loc[:,f"dQ{patch}"] = h * A * (self.data.loc[:,"T"] - Twall) / self.time.dCAdt
#Compute cumulative
self.data.loc[:,f"Q{patch}"] = self.cumulativeIntegral(f"dQ{patch}")
#Add to total
self.data.loc[:,"dQwalls"] += self.data.loc[:,f"dQ{patch}"]
self.data.loc[:,"Qwalls"] += self.data.loc[:,f"Q{patch}"]
####################################
[docs]
def integrateVariable(self, y:str, *, x:str="CA", start:float=None, end:float=None) -> float:
"""
Integrate a variable over another.
If inital or final CA are not given, are set to first/last in CA range.
Args:
y (str): name of y variable.
x (str, optional): Name of x variable. Defaults to "CA".
start (float, optional): Initial CA. Defaults to None.
end (float, optional): Final CA. Defaults to None.
Returns:
float: Integrated value
"""
if not x in self.data.columns:
raise ValueError(f"Variable '{x}' not present among data.")
if not y in self.data.columns:
raise ValueError(f"Variable '{y}' not present among data.")
from scipy import integrate
start = self.data.loc[0, "CA"] if start is None else start
end = self.data.loc[len(self.data)-1, "CA"] if end is None else end
self.checkType(start,float,"start")
self.checkType(end,float,"end")
index = self.data.index[np.array(self.data.loc[:,"CA"] >= start) & np.array(self.data.loc[:,"CA"] <= end)]
data = self.data.iloc[index]
#Filter out "nan"
Yarray = data.loc[:,y].copy()
Yarray[np.isnan(Yarray)] = 0.0
return integrate.trapz(Yarray, x=data.loc[:,x])
####################################
[docs]
def cumulativeIntegral(self, y:str, *, x:str="CA", start:float=None) -> np.ndarray:
"""
Compute the cumulative integral of a variable over another.
If start is not given, it is set to self.time.startOfCombustion.
Args:
y (str): name of y variable.
x (str, optional): Name of x variable. Defaults to "CA".
start (float, optional): Initial CA. Defaults to None.
Returns:
pd.DataFrame: Cumulative integral function
"""
if not x in self.data.columns:
raise ValueError(f"Variable '{x}' not present among data.")
if not y in self.data.columns:
raise ValueError(f"Variable '{y}' not present among data.")
from scipy import integrate
#Check for start
start = self.time.startOfCombustion() if start is None else start
#Check for motored
start = self.data.loc[:,"CA"][0] if start is None else start
#Check type
self.checkType(start,float,"start")
#Filter out "nan"
Yarray = self.data.loc[:,y].copy()
Yarray[np.isnan(Yarray)] = 0.0
#Compute cumulative
out = integrate.cumulative_trapezoid(Yarray, x=self.data.loc[:,x], initial=0.0)
#Set zero at start
valAtStart = np.interp(start, self.data.loc[:,"CA"], out)
out -= valAtStart
return out
####################################
[docs]
def IMEP(self, start:float=None, end:float=None) -> float:
"""
Compute indicated mean effective pressure.
If inital or final CA are not given, are set to first/last in CA range.
Args:
start (float, optional): Initial CA. Defaults to None.
end (float, optional): Final CA. Defaults to None.
Returns:
float: IMEP [Pa]
"""
start = self.data.loc[0, "CA"] if start is None else start
end = self.data.loc[len(self.data)-1, "CA"] if end is None else end
self.checkType(start,float,"start")
self.checkType(end,float,"end")
index = self.data.index[np.array(self.data.loc[:,"CA"] >= start) & np.array(self.data.loc[:,"CA"] <= end)]
data = self.data.iloc[index]
data.loc[index,"V"] = self.geometry.V(data.loc[:,"CA"])
return self.work(start=start, end=end)/(np.nanmax(data.loc[:,"V"]) - np.nanmin(data.loc[:,"V"]))
####################################
[docs]
def work(self, start:float=None, end:float=None) -> float:
"""
Compute indicated work (positive outgoing).
If inital or final CA are not given, are set to first/last in CA range.
Args:
start (float, optional): Initial CA. Defaults to None.
end (float, optional): Final CA. Defaults to None.
Returns:
float: Work [J]
"""
from scipy import integrate
start = self.data.loc[0, "CA"] if start is None else start
end = self.data.loc[len(self.data)-1, "CA"] if end is None else end
self.checkType(start,float,"start")
self.checkType(end,float,"end")
index = self.data.index[np.array(self.data.loc[:,"CA"] >= start) & np.array(self.data.loc[:,"CA"] <= end)]
#Remove nan
index = index[np.invert(np.isnan(self.data.loc[index, "p"]))]
data = self.data.iloc[index]
data.loc[index,"V"] = self.geometry.V(data.loc[:,"CA"])
return integrate.trapz(data.loc[:,"p"], x=data.loc[:,"V"])
####################################
[docs]
def plotPV(self, /,*,start:float=None, end:float=None, loglog:bool=True, timingsParams:dict=dict(), showTimings:bool=True, ax:Axes=None, **kwargs):
"""
Create the pressure-volume diagram of the thermodynamic cycle.
Args:
start (float, optional): The beginning of the plot (CA). Defaults to None.
end (float, optional): The end of the plot (CA). Defaults to None.
loglog (bool, optional): log-log scale. Defaults to True.
timingsParams(dict, optional): The kwargs for the scatter for timings. Defaults to:
{
"edgecolor":"k",
"zorder":2,
}
showTimings (bool, optional): Show the timings through markers? Defaults to True.
ax (Axes, optional): The axes to use. Defaults to None (create new figure).
"""
#Get start and end
if start is None:
start = self.data.iloc[0]["CA"]
if end is None:
end = self.data.iloc[len(self.data)-1]["CA"]
#Set default timingsParams
default = \
{
"edgecolor":"k",
"zorder":2,
}
[timingsParams.update({p:default[p]}) for p in default if not p in timingsParams]
#Check arguments
self.checkType(start,float,"start")
self.checkType(end,float,"end")
self.checkType(loglog,bool,"loglog")
#Check if data were already processed:
if not "p" in self.data.columns:
raise ValueError("Data were not yet loaded/pre-processed (field p not present in self.data).")
#Compute volume
self.data.loc[:,"V"] = self.geometry.V(self.data.loc[:,"CA"])
#Compute pressure in bar
self.data.loc[:,"pBar"] = self.data.loc[:,"p"]/1e5
#Plot
ax = self.data.plot(x="V", y="pBar", xlabel="V [$m^3$]", ylabel="p [bar]", loglog=loglog, ax=ax, **kwargs)
#Timings
if showTimings:
timings = self.time.timings
size = [timingsParams.pop("markersize")]*len(timings) if "markersize" in timingsParams else None #The size of the markers
if not "facecolor" in timingsParams: #Use same color of plot if not specified
timingsParams["facecolor"] = ax.lines[-1]. get_color()
ax.scatter([self.geometry.V(timings[t]) for t in timings], [self.data.pBar(timings[t]) for t in timings], s=size, **timingsParams)
#Return the Axes
return ax
####################################
[docs]
def plot(self, *args, **kwargs):
"""
Redirects to 'self.data.plot' (See helper of DataFrame.plot method)
"""
return self.data.plot(*args, **kwargs)
####################################
[docs]
def plotP_ROHR(self, *, label="_noLabel", legend=False, apparent=False, pkwargs:dict=None, rohrkwargs:dict=None, axes:tuple[Axes,Axes]=None, adjustLims:bool=True, **kwargs):
"""
Plot pressure (left axis) and ROHR (right axis).
Automatically adjust limits of y axes to minimize overlapping between
left and right curves. X axis limits are set to [IVC,EVO].
Args:
label (str, optional): the label to use in the legend. Defaults to "_noLabel".
legend (bool, optional): wether to make the legend. Defaults to False.
apparent (bool, optional): plot apparent heat release instead of ROHR. Defaults to False.
adjustLims (bool, optional): Automatically adjust x/y limits. Defaults to True.
axes (tuple[matplotlib.axes.Axes,matplotlib.axes.Axes]): tuple with pressure and ROHR axes to use. Defaults to None.
pkwargs(dict, optional): The kwargs for the pressure plot. Defaults to:\
{\
"linestyle":"-"\
}
rohrkwargs(dict, optional): The kwargs for the pressure plot. Defaults to:\
{\
"linestyle":"--"\
}
**kwargs: Keyword arguments common to both plots (for example, figsize)
"""
if not rohrkwargs is None:
self.checkType(rohrkwargs, dict, "rohrkwargs")
rohrkwargs = {**rohrkwargs}
else:
rohrkwargs = {}
if not pkwargs is None:
self.checkType(pkwargs, dict, "pkwargs")
pkwargs = {**pkwargs}
else:
pkwargs = {}
#Check if data were already processed:
if not all(var in self.data.columns for var in ["p", "ROHR"]):
raise ValueError("Data were not yet processed (fields p and ROHR not present in self.data).")
#Compute pressure in bar
self.data.loc[:,"pBar"] = self.data.loc[:,"p"]/1e5
#Check axes:
pLim_old = [float("inf"), -float("inf")]
rohrLim_old = [float("inf"), -float("inf")]
if not axes is None:
self.checkType(axes, tuple, "axes")
if not len(axes) == 2:
raise ValueError("Entry 'axes' must be of length 2.")
from matplotlib import pyplot as plt
if not axes[0] is None:
self.checkType(axes[0], Axes, "axes[0]")
pLim_old = axes[0].get_ylim()
if not axes[1] is None:
self.checkType(axes[1], Axes, "axes[1]")
rohrLim_old = axes[1].get_ylim()
axp = axes[0]
axrohr = axes[1]
else:
axp = None
axrohr = None
#Set default kwargs
default_p = \
{
"linestyle":"-",
}
[pkwargs.update({p:default_p[p]}) for p in default_p if not p in pkwargs]
[pkwargs.update({p:kwargs[p]}) for p in kwargs]
default_rohr = \
{
"linestyle":"--",
}
[rohrkwargs.update({p:default_rohr[p]}) for p in default_rohr if not p in rohrkwargs]
[rohrkwargs.update({p:kwargs[p]}) for p in kwargs]
#Plot pressure:
axp = self.data.plot(x="CA", y="pBar", xlabel="CA [°]", ylabel="p [bar]", ax = axp, label=label, legend=legend, **pkwargs)
#Plot rohr
if axrohr is None:
axrohr = axp.twinx()
#Color
if not any(var in rohrkwargs for var in ["c", "color"]): #Use same color of pressure plot if not specified
rohrkwargs["color"] = axp.lines[-1].get_color()
if apparent:
self.data.plot(x="CA", y="AHRR", xlabel="CA [°]", ylabel="AHRR [J/CA]", legend=False, label="_no", ax=axrohr, **rohrkwargs)
else:
self.data.plot(x="CA", y="ROHR", xlabel="CA [°]", ylabel="ROHR [J/CA]", legend=False, label="_no", ax=axrohr, **rohrkwargs)
#x limits
xlim = kwargs["xlim"] if "xlim" in kwargs else [self.time.IVC if self.time.startOfCombustion() is None else self.time.startOfCombustion(), self.time.EVO]
axp.set_xlim(xlim)
where = np.array(self.data.loc[:,"CA"] >= xlim[0]) & np.array(self.data.loc[:,"CA"] <= xlim[-1])
#Limits
if adjustLims:
a = 0.02
b = 0.2
c = 0.7
#pressure limits
if not "ylim" in pkwargs:
data = self.data.loc[:,"pBar"][where]
if len(data[np.invert(np.isnan(data))]) > 0:
lim = [np.nanmin(data), np.nanmax(data)]
lim = [lim[0] - (lim[1] - lim[0])*b, lim[1] + (lim[1] - lim[0])*a]
lim = [min(lim[0], pLim_old[0]), max(lim[1], pLim_old[1])]
axp.set_ylim(lim)
#ROHR limits
if not "ylim" in rohrkwargs:
if apparent:
data = self.data.loc[:,"AHRR"][where]
else:
data = self.data.loc[:,"ROHR"][where]
if len(data[np.invert(np.isnan(data))]) > 0:
lim = [np.nanmin(data), np.nanmax(data)]
lim = [lim[0] - (lim[1] - lim[0])*a, lim[1] + (lim[1] - lim[0])*c]
lim = [min(lim[0], rohrLim_old[0]), max(lim[1], rohrLim_old[1])]
axrohr.set_ylim(lim)
#Return the Axes
return (axp, axrohr)
####################################
[docs]
def save(self, file:str, *, overwrite:bool=False) -> None:
"""Save the results to an excel file
Args:
file (str): The name of the file where to save
"""
self.checkType(file, str, "file")
self.checkType(overwrite, bool, "overwrite")
if os.path.isfile(file) and not overwrite:
raise IOError(f"File {file} exists. Run with overwrite=True to overwrite.")
file = file if file.endswith(".xlsx") else f"{file}.xlsx"
#Write the data
with pd.ExcelWriter(file, engine = 'xlsxwriter') as writer:
self.data._data.to_excel(writer, sheet_name="Data")
#Some info
import openpyxl
book = openpyxl.load_workbook(file)
sh = book.create_sheet("Info")
sh = book["Info"]
sh.cell(row=1, column=1).value = f"File generated with {lice.__name__} package"
for ii, row in enumerate(self.__str__().split("\n")):
sh.cell(row=ii+2, column=1).value = row
book.save(file)
#########################################################################
#Create selection table
EngineModel.createRuntimeSelectionTable()