I recently had an issue with a long running web process that I needed to substantially speed up due to timeouts. The delay arose because the system needed to fetch data from a number of URLs. The total number of URLs varied from user to user, and the response time for each URL was quite long (circa 1.5 seconds).
Problems arose with 10-15 URL requests taking over 20 seconds, and my server HTTP connection was timing out. Rather than extending my timeout time, I have turned to Python’s threading library. It’s easy to learn, quick to implement, and solved my problem very quickly. The system was implemented in Pythons web micro-framework Flask.
Using Threads for a low number of tasks
Threading in Python is simple. It allows you to manage concurrent threads doing work at the same time. The library is called “threading“, you create “Thread” objects, and they run target functions for you. You can start potentially hundreds of threads that will operate in parallel. The first solution was inspired by a number of StackOverflow posts, and involves launching an individual thread for each URL request. This turned out to not be the ideal solution, but provides a good learning ground.
You first need to define a “work” function that each thread will execute separately. In this example, the work function is a “crawl” method that retrieves data from a url. Returning values from threads is not possible and, as such, in this example we pass in a globally accessible (to all threads) “results” array with the index of the array in which to store the result once fetched. The crawl() function will look like:
... import logging from urllib2 import urlopen from threading import Thread from json import JSONDecoder ... # Define a crawl function that retrieves data from a url and places the result in results[index] # The 'results' list will hold our retrieved data # The 'urls' list contains all of the urls that are to be checked for data results = [{} for x in urls] def crawl(url, result, index): # Keep everything in try/catch loop so we handle errors try: data = urlopen(url).read() logging.info("Requested..." + url) result[index] = data except: logging.error('Error with URL check!') result[index] = {} return True
To actually start Threads in python, we use the “threading” library and create “Thead” objects. We can specify a target function (‘target’) and set of arguments (‘args’) for each thread and, once started, the theads will execute the function specified all in parallel. In this case, the use of threads will effectively reduce our URL lookup time to 1.5 seconds (approx) no matter how many URLs there are to check. The code to start the threaded processes is:
#create a list of threads threads = [] # In this case 'urls' is a list of urls to be crawled. for ii in range(len(urls)): # We start one thread per url present. process = Thread(target=crawl, args=[urls[ii], result, ii]) process.start() threads.append(process) # We now pause execution on the main thread by 'joining' all of our started threads. # This ensures that each has finished processing the urls. for process in threads: process.join() # At this point, results for each URL are now neatly stored in order in 'results'
The only peculiarity here is the join() function. Essentially, join() pauses the calling thread (in this case the main thread of the program) until the thread in question has finished processing. Calling join prevents our program from progressing until all URLs have been fetched.
This method of starting one thread for each task will work well unless you have a high number (many hundreds) of tasks to complete.
Using Queue for a high number of tasks
The solution outlined above operated successfully for us, with users to our web application requiring, on average, 9-11 threads per request. The threads were starting, working, and returning results successfully. Issues arose later when users required much more threaded processes (>400). With such requests, Python was starting hundreds of threads are receiving errors like:
error: can't start new thread File "https://c8j9w8r3.rocketcdn.me/usr/lib/python2.5/threading.py", line 440, in start _start_new_thread(self.__bootstrap, ())
For these users, the original solution was not viable. There is a limit in your environment to the maximum number of threads that can be started by Python. Another of Pythons built-in libraries for threading, Queue, can be used to get around obstacle. A queue is essentially used to store a number of “tasks to be done”. Threads can take tasks from the queue when they are available, do the work, and then go back for more. In this example, we needed to ensure maximum of 50 threads at any one time, but the ability to process any number of URL requests. Setting up a queue in Python is very simple:
# Setting up the Queue ... from Queue import Queue ... #set up the queue to hold all the urls q = Queue(maxsize=0) # Use many threads (50 max, or one for each url) num_theads = min(50, len(urls))
To return results from the threads, we will use the same technique of passing a results list along with an index for storage to each worker thread. The index needs to be included in the Queue when setting up tasks since we will not be explicitly calling each “crawl” function with arguments ( we also have no guarantee as to which order the tasks are executed).
#Populating Queue with tasks results = [{} for x in urls]; #load up the queue with the urls to fetch and the index for each job (as a tuple): for i in range(len(urls)): #need the index and the url in each queue item. q.put((i,urls[i]))
The threaded “crawl” function will be different since it now relies on the queue. The threads are set up to close and return when the queue is empty of tasks.
# Threaded function for queue processing. def crawl(q, result): while not q.empty(): work = q.get() #fetch new work from the Queue try: data = urlopen(work[1]).read() logging.info("Requested..." + work[1]) result[work[0]] = data #Store data back at correct index except: logging.error('Error with URL check!') result[work[0]] = {} #signal to the queue that task has been processed q.task_done() return True
The new Queue object itself is passed to the threads along with the list for storing results. The final location for each result is contained within the queue tasks – ensuring that the final “results” list is in the same order as the original “urls” list. We populate the queue with this job information:
#Starting worker threads on queue processing for i in range(num_theads): logging.debug('Starting thread ', i) worker = Thread(target=crawl, args=(q,results)) worker.setDaemon(True) #setting threads as "daemon" allows main program to #exit eventually even if these dont finish #correctly. worker.start() #now we wait until the queue has been processed q.join() logging.info('All tasks completed.')
Our tasks will now not be completely processed in parallel, but rather by 50 threads operating in parallel. Hence, 100 urls will take 2 x 1.5 seconds approx. Here, this delay was acceptable since the number of users requiring more than 50 threads is minimal. However, at least the system is flexible enough to handle any situation.
This setup is well suited for the example of non-computationally intesive input/output work (fetching URLs), since much of the threads time will be spent waiting for data. In data-intensive or data science work, the multiprocessing or celery libraries can be better suited since they split work across multiple CPU cores. Hopefully the content above gets you on the right track!
Further information on Python Threading
There is some great further reading on threads and the threading module if you are looking for more in-depth information:
- Python Threading Module Tutorials on YouTube
- Threading information at PyMOTW
- Tutorial on Python Multithreaded Programming at tutorialspoint.com
- Python Programming and Threading tutorial on Python Programming WikiBooks
- Threads in Python on python-course.eu
thanks a lot for this… I was looking for some info around returning values from thread and your post helped.
Thank for your sharing.
I has a question in “Threaded Function for Queue processing”, should we have lock (acquire and release) to make sure there are no corruption when writing data to result dictionary?
Hi Lam Do. The base python data structures are designed to be thread-safe, so I think it’s ok in this instance. However, you will need to be careful that the same keys aren’t used in different threads. The data wont be corrupted, but the results might be unexpected. Read more here: http://stackoverflow.com/questions/1312331/using-a-global-dictionary-with-threads-in-python
Thank you for this, it’s very clear for someone like me who’s just starting with parallel computing in Python 🙂
Just one thing: apparently using `q.empty()` is considered not safe (https://docs.python.org/3/library/queue.html#queue.Queue.empty), and i found a quite convenient workaround for this:
* Loop infinitely
* Unpack the tuple result of `q.get()` immediately
* Catch an exception when something else than a tuple is retrieved from the queue, and break the infinite loop
* Send `None` to the queue after it’s been joined
The code:
“`
def crawl(q, result):
while True:
try:
index, work = q.get()
except TypeError:
break
…
q.join()
for i in range(num_theads):
q.put(None)
“`
function is to be called threaded separately for every user who calls it via a sender id
and that function will return a list as a return value .
func()
…
return list
how to call it from a app@route where i have a sender id o user .
this should result in not disturbing actions entered by a sender 1 with actions entered by sender 2.. please help me guys
forgot to mention since the function is called from app route via a browser the sender id is increased dynamically every time so i cant limit the thread to a number . so guys plz help
This code has helped me a lot…
for num_threads = min(20, len(comp_list)). I am not able to increase the starting value more than 20. It is producing an error of bootstrap….Can anyone help me to increase this value….?
thanks bro,this code really saved my time,i was new to python and has no prior experience on threading.The guide is simply and so easy for beginners like me and i take this time to let others know this post has some value in it
It worked!
I have been trying to get a return value from multithreading queue to work for half a day and this saved me. The main things I had to watch was creation of the queue items and the firing of multiple threads. Couldn’t have done it without this tutorial. Thanks!
Can I please get a copy of this code, or a github link
[…] siguiendo una guía de subprocesos múltiples en https://www.shanelynn.ie/using-python-enhebrado-para-cola-de-resultados-múltiples/. Estoy tratando de implementar una versión de cola ya que la versión encadenada funciona pero no […]
[…] Can we run two threads simultaneously Python? […]
[…] threading function allows multiple processes to run as one unit. To maximize the benefits of this feature, you must […]