import os from email.header import Header from email.mime.text import MIMEText import tweepy import pickle import smtplib import traceback MONITORED_ACCOUNTS = [] KEYWORDS = [] ALERT_FROM_EMAIL = '' ALERT_FROM_PASSWORD = '' ALERT_TO_EMAIL = '' SUBJECT_LINE = 'Twitter Town Hall Alerts' LAST_TWEET_FILE = 'last_tweet_ids.pickle' CONSUMER_KEY = '' CONSUMER_SECRET = '' ACCESS_TOKEN = '' ACCESS_TOKEN_SECRET = '' def load_last_tweet_ids(): """ Load the file containing the last processed tweet ids for each monitored account. Create a blank file if none exists already. :return: a dictionary mapping each account to the most recent processed tweet id """ if not os.path.isfile(LAST_TWEET_FILE): with open(LAST_TWEET_FILE, 'wb') as f: pickle.dump({account: None for account in MONITORED_ACCOUNTS}, f) with open(LAST_TWEET_FILE, 'rb') as f: last_tweet_ids = pickle.load(f) return last_tweet_ids def update_last_tweet_ids(last_tweet_ids): """ Update the last processed tweets file with new ids :param last_tweet_ids: the new dict of ids """ with open(LAST_TWEET_FILE, 'wb') as f: pickle.dump(last_tweet_ids, f) def get_new_tweets(account, last_tweet_ids): """ Get all tweets from an account after the most recent processed tweet. Also update the most recent processed tweet. :param account: the name of the account :param last_tweet_ids: the dict storing the last processed tweet id :return: a list of new tweets """ print('\tFetching tweets from', account) auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET) api = tweepy.API(auth) new_tweets = [tweet for tweet in api.user_timeline(screen_name=account, count=200, tweet_mode='extended') if not tweet.retweeted] print('\t\tGot 200 tweets') if account not in last_tweet_ids: last_tweet_ids[account] = None if last_tweet_ids[account] is None: last_tweet_ids[account] = new_tweets[0].id return new_tweets tweets = [tweet for tweet in new_tweets if tweet.id > last_tweet_ids[account]] if len(tweets) == 0: return [] oldest_id = tweets[-1].id - 1 while oldest_id > last_tweet_ids[account] and len(new_tweets) > 0: new_tweets = api.user_timeline(screen_name=account, count=200, max_id=oldest_id, tweet_mode='extended') new_tweets = [tweet for tweet in new_tweets if tweet.id > last_tweet_ids[account] and not tweet.retweeted] tweets.extend(new_tweets) oldest_id = tweets[-1].id - 1 print('\t\tGot 200 tweets') last_tweet_ids[account] = tweets[0].id return tweets def search_for_keywords(tweets): """ Search new tweets for keywords. :param tweets: a list of new tweets :return: a list of flagged tweets """ flagged_tweets = [] for tweet in tweets: for keyword in KEYWORDS: if keyword in tweet.full_text.lower(): flagged_tweets.append(tweet) break return flagged_tweets def send_alert(flagged_tweets, warning_accounts): """ Send an email with all flagged tweets. :param flagged_tweets: a list of tweets :param warning_accounts: accounts whose tweets could not be fetcched """ message = '' if len(flagged_tweets) > 0: if len(warning_accounts) > 0: message += f'WARNING: Unable to fetch tweets for the following accounts: {", ".join(warning_accounts)}\n\n' message += 'There were {} flagged tweets:\n'.format(len(flagged_tweets)) for tweet in sorted(flagged_tweets, key=lambda x: x.created_at, reverse=True): message += f'\n{tweet.user.name} ({tweet.created_at})\n{tweet.full_text}\n' \ f'https://twitter.com/{tweet.user.screen_name}/status/{tweet.id}\n' msg = MIMEText(message, _charset='UTF-8') msg['Subject'] = Header(SUBJECT_LINE, 'utf-8') server = smtplib.SMTP_SSL('smtp.gmail.com', 465) server.login(ALERT_FROM_EMAIL, ALERT_FROM_PASSWORD) server.sendmail(ALERT_FROM_EMAIL, ALERT_TO_EMAIL, msg.as_string()) server.quit() def send_error_email(e): """ Send an email with an error message :param e: error that occurred """ message = f'The following error occurred:\n {e}' msg = MIMEText(message, _charset='UTF-8') msg['Subject'] = Header('Error with Town Hall Alert Bot', 'utf-8') server = smtplib.SMTP_SSL('smtp.gmail.com', 465) server.login(ALERT_FROM_EMAIL, ALERT_FROM_PASSWORD) server.sendmail(ALERT_FROM_EMAIL, ALERT_TO_EMAIL, msg.as_string()) server.quit() if __name__ == '__main__': try: print('Running script') last_tweet_ids = load_last_tweet_ids() print('Loaded last tweet IDs') tweets = [] warning_accounts = [] for account in MONITORED_ACCOUNTS: try: tweets.extend(get_new_tweets(account, last_tweet_ids)) except tweepy.error.TweepError: warning_accounts.append(account) print('Loaded all tweets') update_last_tweet_ids(last_tweet_ids) print('Updated last tweet IDs') flagged_tweets = search_for_keywords(tweets) print('Found all flagged tweets') send_alert(flagged_tweets, warning_accounts) print('Sent email.') except Exception as e: send_error_email(traceback.format_exc()) raise e