Source code for dallinger.bots


import json
import logging
import random
import tempfile
import uuid

from cached_property import cached_property
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from import By
from import expected_conditions as EC
from import WebDriverWait
from six.moves import urllib
import gevent
import requests
from requests.exceptions import RequestException

logger = logging.getLogger(__file__)

    "phantomjs": webdriver.PhantomJS,
    "firefox": webdriver.Firefox,
    "chrome": webdriver.Chrome,
    "phantomjs": webdriver.DesiredCapabilities.PHANTOMJS,
    "firefox": webdriver.DesiredCapabilities.FIREFOX,
    "chrome": webdriver.DesiredCapabilities.CHROME,

[docs]class BotBase(object): """A base class for bots that works with the built-in demos. This kind of bot uses Selenium to interact with the experiment using a real browser. """ def __init__( self, URL, assignment_id="", worker_id="", participant_id="", hit_id="" ): if not URL: return"Creating bot with URL: %s." % URL) self.URL = URL parts = urllib.parse.urlparse(URL) query = urllib.parse.parse_qs(parts.query) if not assignment_id: assignment_id = query.get("assignment_id", [""])[0] if not participant_id: participant_id = query.get("participant_id", [""])[0] if not hit_id: hit_id = query.get("hit_id", [""])[0] self.assignment_id = assignment_id if not worker_id: worker_id = query.get("worker_id", [""])[0] self.participant_id = participant_id self.hit_id = hit_id self.worker_id = worker_id self.unique_id = worker_id + ":" + assignment_id def log(self, msg):"{}: {}".format(self.participant_id, msg)) @cached_property def driver(self): """Returns a Selenium WebDriver instance of the type requested in the configuration.""" from dallinger.config import get_config config = get_config() if not config.ready: config.load() driver_url = config.get("webdriver_url", None) driver_type = config.get("webdriver_type") driver = None if driver_url: capabilities = CAPABILITY_MAP.get(driver_type.lower()) if capabilities is None: raise ValueError( "Unsupported remote webdriver_type: {}".format(driver_type) ) driver = webdriver.Remote( desired_capabilities=capabilities, command_executor=driver_url ) else: driver_class = DRIVER_MAP.get(driver_type.lower()) if driver_class is not None: kwargs = {} # Phantom JS needs a new local storage directory for every run if driver_class is webdriver.PhantomJS: tmpdirname = tempfile.mkdtemp() kwargs = { "service_args": ["--local-storage-path={}".format(tmpdirname)], } driver = driver_class(**kwargs) if driver is None: raise ValueError("Unsupported webdriver_type: {}".format(driver_type)) driver.set_window_size(1024, 768)"Created {} webdriver.".format(driver_type)) return driver
[docs] def sign_up(self): """Accept HIT, give consent and start experiment. This uses Selenium to click through buttons on the ad, consent, and instruction pages. """ try: self.driver.get(self.URL)"Loaded ad page.") begin = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, "btn-primary")) )"Clicked begin experiment button.") WebDriverWait(self.driver, 10).until(lambda d: len(d.window_handles) == 2) self.driver.switch_to_window(self.driver.window_handles[-1]) self.driver.set_window_size(1024, 768)"Switched to experiment popup.") consent = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.ID, "consent")) )"Clicked consent button.") participate = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, "btn-success")) )"Clicked start button.") return True except TimeoutException: logger.error("Error during experiment sign up.") return False
[docs] def participate(self): """Participate in the experiment. This method must be implemented by subclasses of ``BotBase``. """ logger.error("Bot class does not define participate method.") raise NotImplementedError
[docs] def complete_questionnaire(self): """Complete the standard debriefing form. Answers the questions in the base questionnaire. """"Complete questionnaire.") difficulty = self.driver.find_element_by_id("difficulty") difficulty.value = "4" engagement = self.driver.find_element_by_id("engagement") engagement.value = "3"
[docs] def sign_off(self): """Submit questionnaire and finish. This uses Selenium to click the submit button on the questionnaire and return to the original window. """ try:"Bot player signing off.") feedback = WebDriverWait(self.driver, 20).until( EC.presence_of_element_located((By.ID, "submit-questionnaire")) ) self.complete_questionnaire()"Clicked submit questionnaire button.") self.driver.switch_to_window(self.driver.window_handles[0]) self.driver.set_window_size(1024, 768)"Switched back to initial window.") return True except TimeoutException: logger.error("Error during experiment sign off.") return False
[docs] def complete_experiment(self, status): """Sends worker status ('worker_complete' or 'worker_failed') to the experiment server. """ url = self.driver.current_url p = urllib.parse.urlparse(url) complete_url = "%s://%s/%s?participant_id=%s" complete_url = complete_url % (p.scheme, p.netloc, status, self.participant_id) self.driver.get(complete_url)"Forced call to %s: %s" % (status, complete_url))
[docs] def run_experiment(self): """Sign up, run the ``participate`` method, then sign off and close the driver.""" try: self.sign_up() self.participate() if self.sign_off(): self.complete_experiment("worker_complete") else: self.complete_experiment("worker_failed") finally: self.driver.quit()
[docs]class HighPerformanceBotBase(BotBase): """A base class for bots that do not interact using a real browser. Instead, this kind of bot makes requests directly to the experiment server. """ @property def driver(self): raise NotImplementedError @property def host(self): parsed = urllib.parse.urlparse(self.URL) return urllib.parse.urlunparse([parsed.scheme, parsed.netloc, "", "", "", ""])
[docs] def run_experiment(self): """Runs the phases of interacting with the experiment including signup, participation, signoff, and recording completion. """ self.sign_up() self.participate() if self.sign_off(): self.complete_experiment("worker_complete") else: self.complete_experiment("worker_failed")
[docs] def sign_up(self): """Signs up a participant for the experiment. This is done using a POST request to the /participant/ endpoint. """ self.log("Bot player signing up.") self.subscribe_to_quorum_channel() while True: url = ( "{host}/participant/{self.worker_id}/" "{self.hit_id}/{self.assignment_id}/" "debug?fingerprint_hash={hash}&recruiter=bots:{bot_name}".format(, self=self, hash=uuid.uuid4().hex, bot_name=self.__class__.__name__, ) ) try: result = result.raise_for_status() except RequestException: self.stochastic_sleep() continue if result.json()["status"] == "error": self.stochastic_sleep() continue self.on_signup(result.json()) return True
[docs] def sign_off(self): """Submit questionnaire and finish. This is done using a POST request to the /question/ endpoint. """ self.log("Bot player signing off.") return self.complete_questionnaire()
[docs] def complete_experiment(self, status): """Record worker completion status to the experiment server. This is done using a GET request to the /worker_complete or /worker_failed endpoints. """ self.log("Bot player completing experiment. Status: {}".format(status)) while True: url = "{host}/{status}?participant_id={participant_id}".format(, participant_id=self.participant_id, status=status ) try: result = requests.get(url) result.raise_for_status() except RequestException: self.stochastic_sleep() continue return result
def stochastic_sleep(self): delay = max(1.0 / random.expovariate(0.5), 10.0) gevent.sleep(delay)
[docs] def subscribe_to_quorum_channel(self): """In case the experiment enforces a quorum, listen for notifications before creating Partipant objects. """ from dallinger.experiment_server.sockets import chat_backend self.log("Bot subscribing to quorum channel.") chat_backend.subscribe(self, "quorum")
[docs] def on_signup(self, data): """Take any needed action on response from /participant call.""" self.participant_id = data["participant"]["id"]
@property def question_responses(self): return {"engagement": 4, "difficulty": 3}
[docs] def complete_questionnaire(self): """Complete the standard debriefing form. Answers the questions in the base questionnaire. """ while True: data = { "question": "questionnaire", "number": 1, "response": json.dumps(self.question_responses), } url = "{host}/question/{self.participant_id}".format(, self=self ) try: result =, data=data) result.raise_for_status() except RequestException: self.stochastic_sleep() continue return True