In this post we will provide an update on what λ Lovelace has been up to:

  • Caching data
  • Challenges & Next Tasks

Caching Data

As mentioned before, the rate limit of the Twitter API is a problem that we are always trying to work around. For the home timeline, we are only allowed to send 15 requests per 15 minutes, which means it will hit the rate limit quickly if a user keeps refreshing his/her timeline. Besides this, we can only fetch up to 800 of the newest tweets of user’s home timeline. Therefore, the tweets before the 800 tweets will never be fetched.

To solve the rate limit problem, we finally decided to use a database to cache the tweets. And our Flask server will get tweets from database directly instead of getting them from the Twitter API.

In order to cache data, we were thinking about setting up a background task which runs 24/7 for each user. This task keeps fetching the incoming tweets of the user’s home timeline and save it into the database. We considered two ways to continuously collect tweets, Twitter Streaming API or Twitter REST API + Celery.

Celery vs. Twitter Streamming API

Twitter Streamming API

The Streaming API gives developers low latency access to a stream of Tweet data. It provides three streams: public stream, user stream and site stream.

For this project, site stream will be the most suitable stream. Unfortunately, Twitter has closed the stream, which means our only choice will be the user stream. The user stream is a stream that contains roughly all of the data corresponding with a single user’s view of Twitter. Once we start a stream for a user, let it run 24/7, then we can store all the data associated with the user.

Although the user stream sounds like a very good choice of data source, we found that the streaming API also has rate limit. According to Twitter:

Each Twitter account is limited to only a few simultaneous User Streams connections per OAuth application, regardless of IP. Once the per-application limit is exceeded, the oldest connection will be terminated.

Twitter didn’t specify the maximum number of connections allowed. But we found somebody was asking “What is the maximum number of simultaneous user streams per application?” on Twitter’s Developer Forums. According to the answer, 10~20 simultaneous user streams will be reasonable. But if we have hundreds of users, user stream will be completely inappropriate for this kind of use case.

As we don’t want to introduce a new rate limit while we are trying to solve the old one. So we came up with the second solution - Celery.

Celery - Distributed Task Queue

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. So we can use Celery to set up a background task without affecting other requests of our app. As it supports scheduling, we can set up a periodic task which keeps fetching data 24/7. The basic scenario is described as below:

The Celery server is running 24/7 and the periodic task is executed every 60 seconds. The reason why we are doing this is because the rate limit of twitter API is 15 requests per 15 minutes. So if we only use one request in a minute, it will avoid hitting the rate limit.

CODE:

#config celery, the task 'add' will be executed every 60 seconds
app.conf.update(
                CELERYBEAT_SCHEDULE = {
                "add": {
                "task": "tasks.add",
                "schedule": timedelta(seconds=60),
                }, },
                )
                
@app.task
def add():
    
    #read tokens of all user's in the database
    tokens = read_tokens()
    
    #iteratively fetch tweets of each user
    #all tasks are async tasks, so will not affect each other
    for item in tokens:
        get_tweet.delay(item)
        
#method to read all tokens of the users from database
def read_tokens():
    
    r.connect(host = 'secret', port = 28015, db='lovelace', password = "secret").repl()
    tokens=r.db('lovelace').table('user_tokens').run()
    return tokens

So the task ‘add’ will be executed every 60 seconds. The ‘add’ task will first read all tokens from the table ‘user_tokens’ in the database. The ‘user_tokens’ table is like a central table which specifies whose tweets Celery should fetch and store. It stores all the tokens of the users who have currently logged into our app. Every time after a user login, the Flask server will insert the user’s OAuth token into the table so Celery will start fetching the tweets of the user. If there is no need to fetch tweets for the user, for example if a user has logged out for a long time, we can simply delete the user’s token and Celery will not fetch the user’s tweets anymore.

After getting the tokens, it will iteratively use the tokens to fetch new tweets from the Twitter REST API and save the tweets into the database by calling the ‘get_tweet’ method. The .delay() means each ‘get_tweet’ task is executed asynchronously, so they won’t affect each other.

CODE:

#get tweets
@app.task(bind=True)
def get_tweet(self, token):
    #connect to database
    r.connect(host='secret', port=28015, db='lovelace', password="secret").repl()
    
    created_at = token['created_at']
    print(created_at)
    now = r.now().to_epoch_time().run()
    print(now - created_at)
    screen_name = token['screen_name']
    #check the time iterval between the time when token is inserted and the time when next celery task executes
    #if the interval is less than 64 seconds, we wait for another 64 seconds,this is to avoid sending two requests to the Twitter API in one minute
    #we can only send one request each 64 seconds
    if (now - created_at) >= 64:
        
        #authentication
        auth = tweepy.OAuthHandler(consumer_key=token['consumer_key'], consumer_secret=token['consumer_secret'])
        auth.set_access_token(token['access_token'], token['access_secret'])
        
        api = tweepy.API(auth)
        
        #fetch user's home timeline and insert it into database
        #here is an error handling, if the rate limit exceed, the task will be retried after 5 seconds
        try:
            #since_id is the id of the newest tweet of user's home timeline in the database
            since_id = r.db('lovelace').table('tweets').filter({'screen_name':'xinqili123'}).max('tweet_id').run()
            #only fetch the tweets whose ids are greater than the since_id, to avoid fetching duplicate tweets
            new_tweets = [tweet._json for tweet in api.home_timeline(count=200,since_id = since_id['tweet_id'])]
            #insert each tweet into database
            for item in new_tweets:
                r.db('lovelace').table('tweets').insert({'screen_name': screen_name,'tweet_id':item['id'], 'tweet': item}).run()
            #check rate limit remaining
            limit = api.rate_limit_status()
            return limit['resources']['statuses']['/statuses/home_timeline']
        except tweepy.RateLimitError as exc:
            raise self.retry(exc=exc,countdown=5,max_retries=10)

This solution is more like we are building a streaming API manually. But this solution has a maximum one minute latency. For example, if new tweets come immediately after a task was just executed. The new tweets will have to wait one minute until the next task execution to be stored into database. It means sometimes, the user will see the new tweets a little bit later than the Twitter official app. One workaround for that problem would be to have the iOS app make a direct User Stream connection to Twitter. However after a team discussion we decided not to do that because we believe that that the main purpose of our app is providing a better recommendation for the users, so one minute latency is acceptable.

Challenges & Next Tasks

  • Recommender System: The Recommender System now fulfils its most core tasks, filtering and recommending tweets. As of this time of writing, the recommender system is directly connecting with the twitter API and performing several operations (Creation of term frequency document, filtering of bad language/spam tweets, ranking user terms and so on) on these tweets. Next steps involve changing the recommender system to allow it to take in textual tweets from the flask server, and perform recommendations based on this input. Following this simple task,the recommender system should also take hashtag significance into account. Currently, hashtags are either ignored or treated as simple textual words. This will be changed so that hashtags are added to the term frequency document where applicable, as a once-off hashtag is likely to not be very significant to the user.

  • iOS App: Preview of photo and video attached to the tweet in the home timeline. Also, when the user taps on the photo or video to view it can be a point to collect his/her interest.

  • Continuous Deployment: Setting up a continuous testing and deployment environment with Docker turned out to be a bigger challenge than we anticipated. We tried a few SaaS solutions but mostly CircleCI, Distelli, and Docker Cloud. In general we got the feeling that Docker support was very much in its infancy. For example one of the great advantages that gives Docker its speed is the layer technology. However CI/CD SaaS solutions can’t (yet) benefit from that because each build runs in an isolated environment (no layer history). That means the Docker images have to be constructed from scratch every time, taking minutes instead of seconds. One of the problems we struggled with was a that these SaaS providers use an old version of the docker engine, v1.9.1 while the latest version is v1.11.2 with v1.12 around the corner. We ran into an issue where we were unable to stop running containers, a bug in v1.9.1 that has since been fixed. We will spend a few more hours to try to get it to work (we are very close with some hacks) but if it fails we’ll not spend more time on it.

  • Celery + Twitter API: Build up a more reliable and stable background task which continously fetches tweets for users.

Until next time!

On behalf of λ Lovelace
- Xinqi Li