Source code for abce

# Copyright 2012 Davoud Taghawi-Nejad
#
# Module Author: Davoud Taghawi-Nejad
#
# ABCE is open-source software. If you are using ABCE for your research you are
# requested the quote the use of this software.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License and quotation of the
# author. You may obtain a copy of the License at
#       http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
# pylint: disable=W0212, C0111


""" The best way to start creating a simulation is by copying the start.py
file and other files from 'abce/template' in https://github.com/AB-CE/examples.

To see how to create a simulation read :doc:`ipython_tutorial`.


This is a minimal template for a start.py::

    from agent import Agent
    from abce import *


    simulation = Simulation(name='ABCE')
    agents = simulation.build_agents(Agent, 'agent', 2)
    for time in range(100):
        simulation.advance_round(time)
        agents.one()
        agents.two()
        agents.three()
    simulation.graphs()

Note two things are important: there must be either a

:func:`~abce.simulation.graphs` or a :func:`~abce.simulation.finalize` at the end
otherwise the simulation blocks at the end.
Further, every round needs to be announced using simulation.advance_round(time).
Where time is any representation of time.

"""
import datetime
import os
import time
import random
import json
import queue
import multiprocessing as mp
from multiprocessing.managers import BaseManager
from collections import defaultdict, OrderedDict
from .db import Database
from .agent import Agent, Trade  # noqa: F401
from .group import Group
from .notenoughgoods import NotEnoughGoods  # noqa: F401
from .agents import (FirmMultiTechnologies, Firm,  # noqa: F401
                     Household, Utility_Function,
                     ProductionFunction, SilentDeadAgent,  # noqa: F401
                     LoudDeadAgent)  # noqa: F401
from .quote import Quote  # noqa: F401
from .contracts import Contracting  # noqa: F401
from .processorgroup import ProcessorGroup
from .gui import gui, graphs  # noqa: F401


[docs]def execute_advance_round_wrapper(inp): return inp[0].execute_advance_round(inp[1])
[docs]class MyManager(BaseManager): pass
[docs]class Simulation(object): """ This class in which the simulation is run. Actions and agents have to be added. databases and resource declarations can be added. Then runs the simulation. Args: name: name of the simulation random_seed (optional): a random seed that controls the random number of the simulation trade_logging: Whether trades are logged,trade_logging can be 'group' (fast) or 'individual' (slow) or 'off' processes (optional): The number of processes that run in parallel. Each process hosts a share of the agents. Default is all your logical processor cores times two, using hyper-threading when available. For easy debugging set processes to one and the simulation is executed without parallelization. Sometimes it is advisable to decrease the number of processes to the number of logical or even physical processor cores on your computer. 'None' for all processor cores times 2. **For easy debugging set processes to 1, this way only one agent runs at a time and only one error message is displayed** check_unchecked_msgs: check every round that all messages have been received with get_massages or get_offers. Example:: simulation = Simulation(name='ABCE', trade_logging='individual', processes=None) Example for a simulation:: num_firms = 5 num_households = 2000 w = Simulation(name='ABCE', trade_logging='individual', processes=None) w.declare_round_endowment(resource='labor_endowment', productivity=1, product='labor') w.panel('firm', command='after_sales_before_consumption') firms = w.build_agents(Firm, 'firm', num_firms) households = w.build_agents(Household, 'household', num_households) all = firms + households for r in range(100): self.advance_round(r) households.recieve_connections() households.offer_capital() firms.buy_capital() firms.production() if r == 250: centralbank.intervention() households.buy_product() all.after_sales_before_consumption() households.consume() w.finalize() w.graphs() """ def __init__(self, name='abce', random_seed=None, trade_logging='off', processes=1, check_unchecked_msgs=False): """ """ try: name = simulation_name # noqa: F821 except NameError: pass self.check_unchecked_msgs = check_unchecked_msgs self.num_of_agents_in_group = {} self._messages = {} self._resource_command_group = {} self._db_commands = {} self.num_agents = 0 self._build_first_run = True self.resource_endowment = [] self.perishable = [] self.expiring = [] try: os.makedirs(os.path.abspath('.') + '/result/') except OSError: pass self.path = (os.path.abspath('.') + '/result/' + name + '_' + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")) """ the path variable contains the path to the simulation outcomes it can be used to generate your own graphs as all resulting csv files are there. """ while True: try: os.makedirs(self.path) break except OSError: self.path += 'I' self.trade_logging_mode = trade_logging if self.trade_logging_mode not in ['individual', 'group', 'off']: Exception("trade_logging can be " "'group' (fast) or 'individual' (slow) or 'off'" ">" + self.trade_logging_mode + "< not accepted") self.processes = mp.cpu_count() * 2 if processes is None else processes if processes == 1: self.database_queue = queue.Queue() self._processor_groups = [ProcessorGroup(1, batch=0)] self.execute_advance_round = self._execute_advance_round_seriel else: manager = mp.Manager() self.database_queue = manager.Queue() self.pool = mp.Pool(self.processes) MyManager.register('ProcessorGroup', ProcessorGroup) self.managers = [] self._processor_groups = [] for i in range(self.processes): manager = MyManager() manager.start() self.managers.append(manager) pg = manager.ProcessorGroup(self.processes, batch=i) self._processor_groups.append(pg) self.execute_advance_round = self._execute_advance_round_parallel self.messagess = [list() for _ in range(self.processes + 1)] self._db = Database( self.path, self.database_queue, trade_log=self.trade_logging_mode != 'off') self._db_started = False if random_seed is None or random_seed == 0: random_seed = time.time() random.seed(random_seed) self.sim_parameters = OrderedDict( {'name': name, 'random_seed': random_seed}) self.clock = time.time() self.database = self self.time = None """Returns the current time set with simulation.advance_round(time)"""
[docs] def declare_round_endowment(self, resource, units, product): """ At the beginning of very round the agent gets 'units' units of good 'product' for every 'resource' he possesses. Round endowments are group specific, that means that when somebody except the specified group holds them they do not produce. Args:: resource: The good that you have to hold to get the other units: the multiplier to get the produced good product: the good that is produced if you hold the first good groups: a list of agent groups, which gain the second good, if they hold the first one Example:: A farmer gets a ton of harvest for every acre: w.declare_round_endowment(resource='land', units=1000, product='wheat') """ if self.num_of_agents_in_group: raise Exception( "WARNING: declare_round_endowment(...)" " must be called before the agents are build") self.resource_endowment.append( (resource, units, product))
[docs] def declare_perishable(self, good): """ This good only lasts one round and then disappears. For example labor, if the labor is not used today today's labor is lost. In combination with resource this is useful to model labor or capital. In the example below a worker has an endowment of labor and capital. Every round he can sell his labor service and rent his capital. If he does not the labor service for this round and the rent is lost. Args:: good: the good that perishes Example:: w.declare_perishable(good='LAB') w.declare_perishable(good='CAP') """ if self.num_of_agents_in_group: raise Exception( "WARNING: declare_perishable(...) must be called before " "the agents are build") self.perishable.append(good)
[docs] def declare_expiring(self, good, duration): """ This type of good lasts for several rounds, but eventually expires. For example computers would last for several years and than become obsolete. Args: good: the good, which expires duration: the duration before the good expires """ if self.num_of_agents_in_group: raise Exception( "WARNING: declare_expiring(...) must be called " "before the agents are build") self.expiring.append((good, duration))
[docs] def declare_service(self, human_or_other_resource, units, service): """ When the agent holds the human_or_other_resource, he gets 'units' of service every round the service can be used only with in this round. Args:: human_or_other_resource: the good that needs to be in possessions to create the other good 'self.create('adult', 2)' units: how many units of the service is available service: the service that is created groups: a list of agent groups that can create the service Example:: For example if a household has two adult family members, it gets 16 hours of work w.declare_service('adult', 8, 'work') """ self.declare_round_endowment( human_or_other_resource, units, service) self.declare_perishable(service)
def _execute_advance_round_seriel(self, time): for pg in self._processor_groups: pg.execute_advance_round(time) def _execute_advance_round_parallel(self, time): parameters = ((pg, time) for pg in self._processor_groups) self.pool.map(execute_advance_round_wrapper, parameters, chunksize=1)
[docs] def advance_round(self, time): if not self._db_started: self._db.start() self._db_started = True self.time = time print(("\rRound" + str(time))) self.execute_advance_round(time)
def __del__(self): self.finalize()
[docs] def finalize(self): """ simulation.finalize() must be run after each simulation. It will write all data to disk Example:: simulation = Simulation(...) ... for r in range(100): simulation.advance_round(r) agents.do_something() ... simulation.finalize() """ if self._db_started: self._db_started = False print('') print((str("time only simulation %6.2f" % (time.time() - self.clock)))) self.database_queue.put('close') while self._db.is_alive(): time.sleep(0.05) try: self.pool.close() self.pool.join() except AttributeError: pass print((str("time with data %6.2f" % (time.time() - self.clock)))) self._write_description_file() self._displaydescribtion()
[docs] def build_agents(self, AgentClass, group_name, number=None, parameters={}, agent_parameters=None): """ This method creates agents. Args: AgentClass: is the name of the AgentClass that you imported group_name: the name of the group, as it will be used in the action list and transactions. Should generally be lowercase of the AgentClass. number: number of agents to be created. parameters: a dictionary of parameters agent_parameters: a list of dictionaries, where each agent gets one dictionary. The number of agents is the length of the list Example:: firms = simulation.build_agents(Firm, 'firm', number=simulation_parameters['num_firms']) banks = simulation.build_agents(Bank, 'bank', parameters=simulation_parameters, agent_parameters=[{'name': 'UBS'}, {'name': 'amex'},{'name': 'chase'}) centralbanks = simulation.build_agents(CentralBank, 'centralbank', number=1, parameters={'rounds': num_rounds}) """ assert number is None or agent_parameters is None, \ 'either set number or agent_parameters in build_agents' if number is not None: num_agents_this_group = number agent_parameters = [None] * num_agents_this_group else: num_agents_this_group = len(agent_parameters) self.sim_parameters.update(parameters) agent_params_from_sim = { 'expiring': self.expiring, 'perishable': self.perishable, 'resource_endowment': self.resource_endowment} for pg in self._processor_groups: pg.add_group(AgentClass, num_agents_this_group=num_agents_this_group, agent_args={'group': group_name, 'trade_logging': self.trade_logging_mode, 'database': self.database_queue, 'random_seed': random.random(), 'agent_parameters': agent_parameters, 'simulation_parameters': parameters, 'check_unchecked_msgs': self.check_unchecked_msgs}, parameters=parameters, agent_parameters=agent_parameters, agent_params_from_sim=agent_params_from_sim) self.num_of_agents_in_group[group_name] = num_agents_this_group return Group(self, [group_name], AgentClass)
[docs] def create_agent(self, AgentClass, group_name, parameters=None, agent_parameters=None): """ Creates an additional agent in an existing group during the simulation. Args: AgentClass: the class of agent to create. (can be the same class as the creating agent) 'group_name': the name of the group the agent should belong to. This is the group name string e.G. :code:`'firm'`, not the group variable e.G. :code:`firms` in :code:`firms = simulation.build_agents(...)` parameters: a dictionary of parameters agent_parameters: a dictionary of parameters Example:: self.create_agent(BeerFirm, 'beerfirm', parameters=self.parameters, agent_parameters={'creation': self.time}) """ id = self.num_of_agents_in_group[group_name] self.num_of_agents_in_group[group_name] += 1 pg = self._processor_groups[id % self.processes] pg.append(AgentClass, id=id, agent_args={'group': group_name, 'trade_logging': self.trade_logging_mode, 'database': self.database_queue, 'random_seed': random.random(), 'agent_parameters': agent_parameters, 'simulation_parameters': parameters, 'check_unchecked_msgs': self.check_unchecked_msgs, 'start_round': self.time}, parameters=parameters, agent_parameters=agent_parameters)
[docs] def delete_agent(self, name, quite=True): """ This deletes an agent. By default, quite is set to True, all future messages to this agent are deleted. If quite is set to False agents are completely deleted. This makes the simulation faster, but if messages are send to this agents the simulation stops. Args: name: Name tuple of the agent. e.G. ('firm', 13) quite: whether the dead agent ignores incoming messages. """ group, id = name pg = self._processor_groups[id % self.processes] if quite: pg.replace_with_dead(group, id, SilentDeadAgent) else: pg.replace_with_dead(group, id, LoudDeadAgent)
def _write_description_file(self): description = open(os.path.abspath( self.path + '/description.txt'), 'w') description.write(json.dumps(self.sim_parameters, indent=4, skipkeys=True, default=lambda x: 'not_serializeable')) def _displaydescribtion(self): description = open(self.path + '/description.txt', 'r') print((description.read()))
[docs] def graphs(self): """ after the simulation is run, graphs() shows graphs of all data collected in the simulation. Shows the same output as the @gui decorator shows. Example:: simulation = Simulation(...) for r in range(100): simulation.advance_round(r) agents.do_something() ... simulation.graphs() """ self.finalize() graphs(self.sim_parameters)
def _number_or_string(word): """ returns a int if possible otherwise a float from a string """ try: return int(word) except ValueError: try: return float(word) except ValueError: return word
[docs]def defaultdict_list(): return defaultdict(list)