Targeted broadcasting of multithreaded results.

Okay, so here’s a basic problem. You’re building an iOS application (or an Android application) which needs to download an image from a remote site for display in a view.

So you write code similar to the following:

- (void)setImageUrlTest:(NSString *)url
{
	/*
	 *	Request the download on a background thread. Once we've downloaded the
	 *	results, we kick off a new block in the main thread to update the
	 *	image for this image object, and then animate a fade-in
	 */
	
	dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
		NSURLResponse *resp;
		NSURLRequest *req = [NSURLRequest requestWithURL:[NSURL URLWithString:url]];
		NSData *data = [NSURLConnection sendSynchronousRequest:req returningResponse:&resp error:nil];
		if (data) {
			/*
			 *	We have the data from the remote server. Kick off into the main
			 *	thread the UI update
			 */
			
			dispatch_async(dispatch_get_main_queue(), ^{
				/*
				 *	Be a little tricky: fade in from transparent with a half
				 *	second delay
				 */
				
				UIImage *image = [UIImage imageWithData:data];
				self.alpha = 0;
				self.image = image;
				[UIView animateWithDuration:0.5 animations:^{
					self.alpha = 1;
				}];
			});
		} else {
			NSLog(@"An error occurred downloading image");
		}
	});
}

If we were doing this on Android, we could do this using a Java thread. (Ideally we’d want to create a thread queue, but for illustration purposes we will just create a new thread.)

    void setImageUrlTest(final String url)
    {
        final Handler h = new Handler();
        
        /*
         *  Request the download on a background thread. Once we've downloaded
         *  the results, we kick off a runnable in the main thread using the
         *  handler created above, animating a fade-in
         */
        new Thread() {
            @Override
            public void run()
            {
                try {
                    URL u = new URL(url);
                    URLConnection conn = u.openConnection();
                    InputStream is = conn.getInputStream();
                    final Bitmap bmap = BitmapFactory.decodeStream(is);
                    is.close();
                    
                    /*
                     * If we get here we have a bitmap. Put a runnable which
                     * will set the image in the main evnet loop
                     */
                    setVisibility(View.INVISIBLE);
                    setImageBitmap(bmap);
                    h.post(new Runnable() {
                        @Override
                        public void run()
                        {
                            AlphaAnimation a = new AlphaAnimation(0.0f, 1.0f);
                            a.setDuration(500);
                            a.setAnimationListener(new AnimationListener() {
                                @Override
                                public void onAnimationEnd(Animation animation)
                                {
                                }

                                @Override
                                public void onAnimationRepeat(Animation animation)
                                {
                                }

                                @Override
                                public void onAnimationStart(Animation animation)
                                {
                                    setVisibility(View.VISIBLE);
                                }
                            });
                            startAnimation(a);
                        }
                    });
                }
                catch (Throwable th) {
                    Log.d("DownloadImageView","Failed to download image " + url, th);
                }
            }
        }.start();
    }

In both cases what we do is kick off a background task which downloads the image, then using a reference to the original image view, we then load the image into the image view (doing so on the main thread where all UI tasks need to take place), and finally we trigger an animation which fades in the view.

Question: What happens if the image view goes away?

On Android and on iOS, it’s fairly routine to have a slow internet connection, and the user may dismiss your view controller or activity before the view finishes downloading.

But there is a problem with that.

On iOS, the block object that you create has an implicit ‘retain’ on the UIImageView object. This means that, until the network operation completes, all the resources associated with the UIImageView cannot be released.

Things get worse on Android, which (ironically enough) has a much smaller memory footprint for typical applications: not only can’t the ImageView object go away, but the ImageView object contains a reference to the activity which launched the image. Meaning not only is the ImageView object retained by the anonymous thread declaration, but so is the activity that the image is contained in–along with the entire rest of the view hierarchy and all other activity resources associated with the containing activity.

Given that a network timeout can be up to 30 seconds, this means if the user is browsing in and out of different screens, you can very quickly run out of memory as memory is filled up with defunct views whose sole purpose is to exist as a target for a network activity that is no longer really necessary.

What to do?

Okay, the following is not a viable solution, despite my seeing it in multiple places:

	__weak UIImageView *weakSelf = self;
	dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
		NSURLResponse *resp;
		NSURLRequest *req = [NSURLRequest requestWithURL:[NSURL URLWithString:url]];
		NSData *data = [NSURLConnection sendSynchronousRequest:req returningResponse:&resp error:nil];
		if (data) {
			/*
			 *	We have the data from the remote server. Kick off into the main
			 *	thread the UI update
			 */
			
			dispatch_async(dispatch_get_main_queue(), ^{
				/*
				 *	Be a little tricky: fade in from transparent with a half
				 *	second delay
				 */
				
				UIImage *image = [UIImage imageWithData:data];
				weakSelf.alpha = 0;
				weakSelf.image = image;
				[UIView animateWithDuration:0.5 animations:^{
					weakSelf.alpha = 1;
				}];
			});
		} else {
			NSLog(@"An error occurred downloading image");
		}
	});

This doesn’t work because if the image is delayed during download and the image view goes away, the weakSelf reference now points to garbage, causing a crash.

Broadcast to the rescue

One possible solution on iOS is to use NSNotificationCenter to send notifications when a network operation completes successfully. The advantage of using NSNotificationCenter (or an equivalent broadcast/receive mechanism on Android) is that the receiver is effectively detached from the broadcaster: a receiver can detach itself on release, and if the broadcast message has no receivers, the message is dropped on the floor.

And in our case this is the right answer: once the image view has gone away we don’t care what the resulting image was supposed to be.

There is a problem with this, though: the code which handles the incoming message is also separated logically from the code that makes the request. And unless you insert code into the broadcast receiver which explicitly detaches the image view from the NSNotificationCenter once a response is received, you can have dozens (or even hundreds) of broadcast receivers listening to each incoming image.

Another way: CMBroadcastThreadPool

By combining the semantics of a block notification system with a broadcast/receiver pair we can circumvent these problems.

Internally we maintain a map between a request and the block receiving the response. Each block also is associated with a ‘handler’ object; the object which effectively ‘owns’ the response. So, in the case of our image view, the ‘handler’ object is our image view itself.

When our image view goes away, we can notify our thread pool object using the removeHandler: method; this walks through the table of requests and response blocks, deleting those response blocks associated with the handle being deleted:

- (void)dealloc
{
	[[CMNetworkRequest shared] removeHandler:self];
}

Note: CMNetworkRequest inherits CMBroadcastThreadPool to implement network semantics. More information later.

We can then submit a request using the request:handler:response: method; this takes in a request (in our case, the NSURLRequest for an image), the handler (that is, the object which will be receiving the response) and the block to invoke when the response is received.

- (void)setImageUrl:(NSString *)url
{
	CMNetworkRequest *req = [CMNetworkRequest shared];
	[req removeHandler:self];	/* Remove old handler */
	NSURLRequest *rurl = [NSURLRequest requestWithURL:[NSURL URLWithString:url]];
	[req request:rurl handler:self response:^(NSData *data) {
		UIImage *image = [UIImage imageWithData:data];
		self.alpha = 0;
		self.image = image;
		[UIView animateWithDuration:0.5 animations:^{
			self.alpha = 1;
		}];
	}];
}

The call to request:handler:response: stores the handler and response in association with the request, then executes the request in a background thread. Once the response is received, the CMBroadcastThreadPool object looks up the handler and block to invoke, and if present, invokes the block.

However, if the UIImageView has gone away, there are no blocks to invoke–and the network response is dropped on the floor.


Internally our CMBroadcastThreadPool class on iOS invokes an internal method responseForRequest: to process the request. This method is invoked on a block passed into a background thread via Grand Central Dispatch.

The class itself is presented in full here:

CMBroadcastThreadPool.h

//
//  CMBroadcastThreadPool.h
//  TestThreadPool
//
//  Created by William Woody on 7/20/13.
//  Copyright (c) 2013 William Woody. All rights reserved.
//

#import <Foundation/Foundation.h>

@interface CMBroadcastThreadPool : NSObject
{
	@private
		NSMutableSet *inProcess;
		NSMutableDictionary *receivers;
}

- (void)request:(id<NSObject, NSCopying>)request handler:(id<NSObject>)h response:(void (^)(id<NSObject>))resp;
- (void)removeHandler:(id<NSObject>)h;

/* Override this method for processing requests */
- (id<NSObject>)responseForRequest:(id<NSObject, NSCopying>)request;

@end

CMBroadcastThreadPool.m

//
//  CMBroadcastThreadPool.m
//  TestThreadPool
//
//  Created by William Woody on 7/20/13.
//  Copyright (c) 2013 William Woody. All rights reserved.
//

#import "CMBroadcastThreadPool.h"

/************************************************************************/
/*																		*/
/*	Internal Storage													*/
/*																		*/
/************************************************************************/

@interface CMBroadcastStore : NSObject
@property (retain) id<NSObject> handler;
@property (copy) void (^response)(id<NSObject>);
@end

@implementation CMBroadcastStore

#if !__has_feature(objc_arc)
- (void)dealloc
{
	[handler release];
	[response release];
	[super dealloc];
}
#endif

@end

/************************************************************************/
/*																		*/
/*	Thread pool															*/
/*																		*/
/************************************************************************/

@implementation CMBroadcastThreadPool

- (id)init
{
	if (nil != (self = [super init])) {
		inProcess = [[NSMutableSet alloc] initWithCapacity:10];
		receivers = [[NSMutableDictionary alloc] initWithCapacity:10];
	}
	return self;
}

#if !__has_feature(objc_arc)
- (void)dealloc
{
	[inProcess release];
	[receivers release];
	[super dealloc];
}
#endif

/*	request:handler:response:
 *
 *		Submit a request that will be sent to the specified hanlder, via the
 *	response block
 */

- (void)request:(id<NSObject, NSCopying>)request handler:(id<NSObject>)h response:(void (^)(id<NSObject>))resp
{
	@synchronized(self) {
		/*
		 *	Add this to the map of handlers for this request
		 */
		
		NSMutableArray *recarray = [receivers objectForKey:request];
		if (!recarray) {
			recarray = [[NSMutableArray alloc] initWithCapacity:10];
			[receivers setObject:recarray forKey:request];
#if !__has_feature(objc_arc)
			[recarray release];
#endif
		}
		
		CMBroadcastStore *store = [[CMBroadcastStore alloc] init];
		store.handler = h;
		store.response = resp;
		[recarray addObject:store];
#if !__has_feature(objc_arc)
		[store release];
#endif

		/*
		 *	Now enqueue a request in GCD. This only enqueues the item if the
		 *	item is not presently in the queue.
		 */
		 
		if (![inProcess containsObject:request]) {
			[inProcess addObject:request];
			
			dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
				
				/* Process for response */
				id<NSObject> response = [self responseForRequest:request];
				
				/* Get the list of receivers; if any exist, trigger response */
				@synchronized(self) {
					NSMutableArray *a = [receivers objectForKey:request];
					if (a) {
						dispatch_async(dispatch_get_main_queue(), ^{
							for (CMBroadcastStore *s in a) {
								s.response(response);
							}
						});
						[receivers removeObjectForKey:request];
					} else {
						NSLog(@"No receivers");
					}
					
					/* Remove from list of items in queue */
					[inProcess removeObject:request];
				}
			});
		}
		
	}
}

/*	removeHandler:
 *
 *		Remove the handler, removing all the receivers for this incoming
 *	message
 */

- (void)removeHandler:(id<NSObject>)h
{
	@synchronized(self) {
		NSArray *keys = [receivers allKeys];
		for (id request in keys) {
			/*
			 *	Remove selected handlers associated with this receiver
			 */
			
			NSMutableArray *a = [receivers objectForKey:request];
			int i,len = [a count];
			for (i = len-1; i >= 0; --i) {
				CMBroadcastStore *store = [a objectAtIndex:i];
				if (store.handler == h) {
					[a removeObjectAtIndex:i];
				}
			}
			if ([a count] <= 0) {
				/*
				 *	If empty, remove the array of responses.
				 */
				
				[receivers removeObjectForKey:request];
			}
		}
	}
}

/*	responseForRequest:
 *
 *		This returns a response for the specified request. This should be
 *	overridden if possible
 */

- (id<NSObject>)responseForRequest:(id<NSObject, NSCopying>)request
{
	return nil;
}

@end

In Java

For Java I’ve done the same sort of thing, except I’ve also included a thread pool mechanism which manages a finite number of background threads. I’ve also added code which causes a request to be dropped entirely if it is not being processed. For example, if you’re attempting to download an image, but the image view goes away, and the request to download the image hasn’t started being processed by a background thread, then we drop the request entirely.

To use in Android you need to override runOnMainThread to put the runnable into the main thread. (This can be done using Android’s ‘Handler’ class.) You also need to provide the processRequest method.

BroadcastThreadPool.java

/*  BroadcastThreadPool.java
 *
 *  Created on Jul 20, 2013 by William Edward Woody
 */

package com.glenviewsoftware.bthreadpool;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;

/**
 * A thread pool which sends the response to a request to a broadcast list,
 * which can be disposed of at any time.
 */
public abstract class BroadcastThreadPool<Response,Request,Handler>
{
    /// The number of threads that can simultaneously run
    private int fMaxThreads;
    
    /// The number of threads currently operating
    private int fCurThreads;
    
    /// The number of waiting threads
    private int fWaitingThreads;
    
    /// The request queue; a queue of requests to be processed
    private LinkedList<Request> fRequestQueue;
    
    /// The list of requests tha are being processed
    private HashSet<Request> fInProcess;
    
    /// The request response mapping; this maps requests to their responses.
    private HashMap<Request,HashMap<Handler,ArrayList<Receiver<Response>>>> fReceivers;
    
    
    private class BackgroundThread implements Runnable
    {

        @Override
        public void run()
        {
            ++fCurThreads;
            ++fWaitingThreads;
            
            for (;;) {
                /*
                 * Get the current request
                 */
                
                Request req;
                
                synchronized(BroadcastThreadPool.this) {
                    while (fRequestQueue.isEmpty()) {
                        try {
                            BroadcastThreadPool.this.wait();
                        }
                        catch (InterruptedException e) {
                        }
                    }
                    
                    /* When we get a request, note it's in process and mark thread in use */
                    req = fRequestQueue.removeFirst();
                    fInProcess.add(req);
                    --fWaitingThreads;
                }
                
                /*
                 * Process the response. If an exception happens, set the response
                 * to null. (Is this right?)
                 */
                
                Response resp;
                try {
                    resp = processRequest(req);
                }
                catch (Throwable th) {
                    /* Unexpected failure. */
                    resp = null;
                }
                
                /*
                 * Now send the response
                 */
                
                synchronized(BroadcastThreadPool.this) {
                    /* Remove request from list of in-process requests */
                    fInProcess.remove(req);
                    
                    /* Get map of receivers for this request and remove from list */
                    final HashMap<Handler,ArrayList<Receiver<Response>>> rmap = fReceivers.get(req);
                    fReceivers.remove(req);
                    
                    /* Send response to all receivers */
                    final Response respFinal = resp;
                    runOnMainThread(new Runnable() {
                        @Override
                        public void run()
                        {
                            for (ArrayList<Receiver<Response>> l: rmap.values()) {
                                for (Receiver<Response> rec: l) {
                                    rec.processResponse(respFinal);
                                }
                            }
                        }
                    });
                    
                    ++fWaitingThreads;
                }
            }
        }
        
    }
    
    /**
     *	Broadcast receiver. This is the interface to the abstract class which
     *  will receive the response once the message has been processed.
     */
    public interface Receiver<Response>
    {
        void processResponse(Response r);
    }
    
    /**
     * Create a new thread pool which uses a broadcast/receiver pair to handle
     * requests.
     * @param maxThreads
     */
    public BroadcastThreadPool(int maxThreads)
    {
        fMaxThreads = maxThreads;
        
        fReceivers = new HashMap<Request,HashMap<Handler,ArrayList<Receiver<Response>>>>();
        fRequestQueue = new LinkedList<Request>();
        fInProcess = new HashSet<Request>();
    }
    
    /**
     * This enqueues the request, adds the response ot the list of listeners
     * @param r
     * @param h
     * @param resp
     */
    public synchronized void request(Request r, Handler h, Receiver<Response> resp)
    {
        boolean enqueue = false;
        
        /*
         *  Step 1: Add this to the hash map of handlers. If we need to
         *  create a new entry, we probably have to enqueue the request
         */
        
        HashMap<Handler,ArrayList<Receiver<Response>>> rm = fReceivers.get(h);
        if (rm == null) {
            /* Enqueue if we're not currently processing the request */
            enqueue = !fInProcess.contains(r);
            
            /* Add this receiver */
            rm = new HashMap<Handler,ArrayList<Receiver<Response>>>();
            fReceivers.put(r,rm);
        }
        
        /*
         * Step 2: Add this receiver to the list of receivers associated with
         * the specified handler
         */
        ArrayList<Receiver<Response>> list = rm.get(h);
        if (list == null) {
            list = new ArrayList<Receiver<Response>>();
            rm.put(h, list);
        }
        list.add(resp);
        
        /*
         * Step 3: If we need to enqueue the request, then enqueue it and wake
         * up a thread to process. Note we only enqueue a request if the same
         * request hasn't already been enqueued.
         */
        
        if (enqueue) {
            fRequestQueue.addLast(r);
            
            if ((fWaitingThreads <= 0) && (fCurThreads < fMaxThreads)) {
                /*
                 * Create new processing thread
                 */
                
                Thread th = new Thread(new BackgroundThread(),"BThread Proc");
                th.setDaemon(true);
                th.start();
            }
            
            notify();
        }
    }
    
    /**
     * This is called when my handler is being disposed or going inactive and
     * is no longer interested in the response. The handler will be removed
     * and all of the responses will go away
     * @param h
     */
    public synchronized void remove(Handler h)
    {
        Iterator<Map.Entry<Request,HashMap<Handler,ArrayList<Receiver<Response>>>>> iter;
        
        /*
         * Iterate through all requests, removing handlers. If the request
         * goes empty, remove
         */
        
        iter = fReceivers.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<Request,HashMap<Handler,ArrayList<Receiver<Response>>>> e = iter.next();
            
            HashMap<Handler,ArrayList<Receiver<Response>>> pm = e.getValue();
            pm.remove(h);
            if (pm.isEmpty()) {
                /*
                 * The request is no longer needed. If it is not in process,
                 * remove from the queue, as it doesn't need to be processed.
                 */
                
                Request req = e.getKey();
                if (!fInProcess.contains(req)) {
                    fRequestQueue.remove(req);
                }
            }
        }
    }
    
    /**
     * runOnMainThread: override on an OS which requires responses on the
     * main thread. For now, this just executes directly
     * @param r
     */
    protected void runOnMainThread(Runnable r)
    {
        r.run();
    }

    /**
     * This is the abstract method executed in the thread queue which responds
     * with a specified response.
     * @param req
     * @return
     */
    protected abstract Response processRequest(Request req);
}

Here is an example of using this class to receive data from a remote network connection. We would pass in a string and receive a byte array of the data from the remote server.

NetworkThreadPool.java

/*  NetworkThreadPool.java
 *
 *  Created on Jul 22, 2013 by William Edward Woody
 */

package com.glenviewsoftware.bthreadpool;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;

public class NetworkThreadPool extends BroadcastThreadPool<byte[], String, Object>
{
    public NetworkThreadPool()
    {
        /* Max of 5 background threads */
        super(5);
    }
    
    private byte[] readFromInput(InputStream is) throws IOException
    {
        byte[] buffer = new byte[512];
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        int rlen;
        
        while (0 < (rlen = is.read(buffer))) {
            baos.write(buffer, 0, rlen);
        }
        
        return baos.toByteArray();
    }

    @Override
    protected byte[] processRequest(String req)
    {
        try {
            URL url = new URL(req);
            URLConnection conn = url.openConnection();
            InputStream is = conn.getInputStream();
            byte[] buffer = readFromInput(is);
            is.close();
            return buffer;
        }
        catch (Throwable th) {
            // Handle exception
            return null;
        }
    }

    public static NetworkThreadPool shared()
    {
        if (gNetworkThreadPool == null) gNetworkThreadPool = new NetworkThreadPool();
        return gNetworkThreadPool;
    }
}

We can then invoke this by writing:

        NetworkThreadPool.shared().request(imageUrl, this, new NetworkThreadPool.Receiver() {
            @Override
            public void processResponse(byte[] r)
            {
                /* Convert to image bitmap and load */
                ...
            }
        });

and of course when the image view goes away, we can (on onDetachedFromWindow) write:

void onDetachedFromWindow()
{
    super.onDetachedFromWindow();
    NetworkThreadPool.shared().remove(this);
}

Disclaimer:

This code is loosely sketched together in order to illustrate the concept of using a thread pool married with a broadcast/receiver model to allow us to handle the case where the receiver no longer exists prior to a request completing in the background.

I believe the code works but it is not throughly tested. There may be better ways to do this.

Feel free to use any of the above code in your own projects.

And if you need a dynamite Android or iOS developer on a contract basis, drop me a line. šŸ™‚

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s