Source code for dallinger.bots

"""Bots."""

import json
import logging
import random
import uuid

from cached_property import cached_property
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from six.moves import urllib
import gevent
import requests
from requests.exceptions import RequestException

logger = logging.getLogger(__file__)


[docs]class BotBase(object): """A base class for bots that works with the built-in demos. This kind of bot uses Selenium to interact with the experiment using a real browser. """ def __init__(self, URL, assignment_id='', worker_id='', participant_id='', hit_id=''): if not URL: return logger.info("Creating bot with URL: %s." % URL) self.URL = URL parts = urllib.parse.urlparse(URL) query = urllib.parse.parse_qs(parts.query) if not assignment_id: assignment_id = query.get('assignment_id', [''])[0] if not participant_id: participant_id = query.get('participant_id', [''])[0] if not hit_id: hit_id = query.get('hit_id', [''])[0] self.assignment_id = assignment_id if not worker_id: worker_id = query.get('worker_id', [''])[0] self.participant_id = participant_id self.hit_id = hit_id self.worker_id = worker_id self.unique_id = worker_id + ':' + assignment_id def log(self, msg): logger.info('{}: {}'.format(self.participant_id, msg)) @cached_property def driver(self): """Returns a Selenium WebDriver instance of the type requested in the configuration.""" from dallinger.config import get_config config = get_config() if not config.ready: config.load() driver_url = config.get('webdriver_url', None) driver_type = config.get('webdriver_type', 'phantomjs').lower() if driver_url: capabilities = {} if driver_type == 'firefox': capabilities = webdriver.DesiredCapabilities.FIREFOX elif driver_type == 'chrome': capabilities = webdriver.DesiredCapabilities.CHROME elif driver_type == 'phantomjs': capabilities = webdriver.DesiredCapabilities.PHANTOMJS else: raise ValueError( 'Unsupported remote webdriver_type: {}'.format(driver_type)) driver = webdriver.Remote( desired_capabilities=capabilities, command_executor=driver_url ) elif driver_type == 'phantomjs': driver = webdriver.PhantomJS() elif driver_type == 'firefox': driver = webdriver.Firefox() elif driver_type == 'chrome': driver = webdriver.Chrome() else: raise ValueError( 'Unsupported webdriver_type: {}'.format(driver_type)) driver.set_window_size(1024, 768) logger.info("Created {} webdriver.".format(driver_type)) return driver
[docs] def sign_up(self): """Accept HIT, give consent and start experiment. This uses Selenium to click through buttons on the ad, consent, and instruction pages. """ try: self.driver.get(self.URL) logger.info("Loaded ad page.") begin = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, 'btn-primary'))) begin.click() logger.info("Clicked begin experiment button.") WebDriverWait(self.driver, 10).until( lambda d: len(d.window_handles) == 2) self.driver.switch_to_window(self.driver.window_handles[-1]) self.driver.set_window_size(1024, 768) logger.info("Switched to experiment popup.") consent = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.ID, 'consent'))) consent.click() logger.info("Clicked consent button.") participate = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, 'btn-success'))) participate.click() logger.info("Clicked start button.") return True except TimeoutException: logger.error("Error during experiment sign up.") return False
[docs] def participate(self): """Participate in the experiment. This method must be implemented by subclasses of ``BotBase``. """ logger.error("Bot class does not define participate method.") raise NotImplementedError
[docs] def complete_questionnaire(self): """Complete the standard debriefing form. Answers the questions in the base questionnaire. """ logger.info("Complete questionnaire.") difficulty = self.driver.find_element_by_id('difficulty') difficulty.value = '4' engagement = self.driver.find_element_by_id('engagement') engagement.value = '3'
[docs] def sign_off(self): """Submit questionnaire and finish. This uses Selenium to click the submit button on the questionnaire and return to the original window. """ try: logger.info("Bot player signing off.") feedback = WebDriverWait(self.driver, 20).until( EC.presence_of_element_located((By.ID, 'submit-questionnaire'))) self.complete_questionnaire() feedback.click() logger.info("Clicked submit questionnaire button.") self.driver.switch_to_window(self.driver.window_handles[0]) self.driver.set_window_size(1024, 768) logger.info("Switched back to initial window.") return True except TimeoutException: logger.error("Error during experiment sign off.") return False
[docs] def complete_experiment(self, status): """Sends worker status ('worker_complete' or 'worker_failed') to the experiment server. """ url = self.driver.current_url p = urllib.parse.urlparse(url) complete_url = '%s://%s/%s?participant_id=%s' complete_url = complete_url % (p.scheme, p.netloc, status, self.participant_id) self.driver.get(complete_url) logger.info("Forced call to %s: %s" % (status, complete_url))
[docs] def run_experiment(self): """Sign up, run the ``participate`` method, then sign off and close the driver.""" try: self.sign_up() self.participate() if self.sign_off(): self.complete_experiment('worker_complete') else: self.complete_experiment('worker_failed') finally: self.driver.quit()
[docs]class HighPerformanceBotBase(BotBase): """A base class for bots that do not interact using a real browser. Instead, this kind of bot makes requests directly to the experiment server. """ @property def driver(self): raise NotImplementedError @property def host(self): parsed = urllib.parse.urlparse(self.URL) return urllib.parse.urlunparse([parsed.scheme, parsed.netloc, '', '', '', ''])
[docs] def run_experiment(self): """Runs the phases of interacting with the experiment including signup, participation, signoff, and recording completion. """ self.sign_up() self.participate() if self.sign_off(): self.complete_experiment('worker_complete') else: self.complete_experiment('worker_failed')
[docs] def sign_up(self): """Signs up a participant for the experiment. This is done using a POST request to the /participant/ endpoint. """ self.log('Bot player signing up.') self.subscribe_to_quorum_channel() while True: url = ( "{host}/participant/{self.worker_id}/" "{self.hit_id}/{self.assignment_id}/" "debug?fingerprint_hash={hash}&recruiter=bots:{bot_name}".format( host=self.host, self=self, hash=uuid.uuid4().hex, bot_name=self.__class__.__name__ ) ) try: result = requests.post(url) result.raise_for_status() except RequestException: self.stochastic_sleep() continue if result.json()['status'] == 'error': self.stochastic_sleep() continue self.on_signup(result.json()) return True
[docs] def sign_off(self): """Submit questionnaire and finish. This is done using a POST request to the /question/ endpoint. """ self.log('Bot player signing off.') return self.complete_questionnaire()
[docs] def complete_experiment(self, status): """Record worker completion status to the experiment server. This is done using a GET request to the /worker_complete or /worker_failed endpoints. """ self.log('Bot player completing experiment. Status: {}'.format(status)) while True: url = ( "{host}/{status}?participant_id={participant_id}".format( host=self.host, participant_id=self.participant_id, status=status ) ) try: result = requests.get(url) result.raise_for_status() except RequestException: self.stochastic_sleep() continue return result
def stochastic_sleep(self): delay = max(1.0 / random.expovariate(0.5), 10.0) gevent.sleep(delay)
[docs] def subscribe_to_quorum_channel(self): """In case the experiment enforces a quorum, listen for notifications before creating Partipant objects. """ from dallinger.experiment_server.sockets import chat_backend self.log("Bot subscribing to quorum channel.") chat_backend.subscribe(self, 'quorum')
[docs] def on_signup(self, data): """Take any needed action on response from /participant call.""" self.participant_id = data['participant']['id']
@property def question_responses(self): return {"engagement": 4, "difficulty": 3}
[docs] def complete_questionnaire(self): """Complete the standard debriefing form. Answers the questions in the base questionnaire. """ while True: data = { 'question': 'questionnaire', 'number': 1, 'response': json.dumps(self.question_responses), } url = ( "{host}/question/{self.participant_id}".format( host=self.host, self=self, ) ) try: result = requests.post(url, data=data) result.raise_for_status() except RequestException: self.stochastic_sleep() continue return True