Celery Flask --- error: [Errno 111] Connection refused


After running through the basic example for flask-celery(runs fine as far as I can tell) I'm trying to integrate this to my own project. Basically, I'm using this below:

from flask import Blueprint, jsonify, request, session
from flask.views import MethodView
from celery.decorators import task

blueprint = Blueprint('myapi', __name__)

class MyAPI(MethodView):

    def get(self, tag):
        return get_resource.apply_async(tag)

def get_resource(tag):

with the same setup as in the example, I'm getting this error:

Traceback (most recent call last):
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1518, in __call__
    return self.wsgi_app(environ, start_response)
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1506, in wsgi_app
    response = self.make_response(self.handle_exception(e))
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1504, in wsgi_app
    response = self.full_dispatch_request()
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1264, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1262, in full_dispatch_request
    rv = self.dispatch_request()
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1248, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 84, in view
    return self.dispatch_request(*args, **kwargs)
  File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 151, in dispatch_request
    return meth(*args, **kwargs)
  File "/x/api/modules/document/document.py", line 14, in get
    return get_resource.apply_async(tag)
  File "/x/venv/lib/python2.7/site-packages/celery/app/task/__init__.py", line 449, in apply_async
    publish = publisher or self.app.amqp.publisher_pool.acquire(block=True)
  File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 657, in acquire
    R = self.prepare(R)
  File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare
    p = p()
  File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda>
    return lambda: self.create_producer()
  File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 265, in create_producer
    pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
  File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 328, in TaskPublisher
    return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
  File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 158, in __init__
    super(TaskPublisher, self).__init__(*args, **kwargs)
  File "/x/venv/lib/python2.7/site-packages/kombu/compat.py", line 61, in __init__
    super(Publisher, self).__init__(connection, self.exchange, **kwargs)
  File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 79, in __init__
  File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 168, in revive
    channel = channel.default_channel
  File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 581, in default_channel
  File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 574, in connection
    self._connection = self._establish_connection()
  File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 533, in _establish_connection
    conn = self.transport.establish_connection()
  File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 279, in establish_connection
  File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 89, in __init__
    super(Connection, self).__init__(*args, **kwargs)
  File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/connection.py", line 129, in __init__
    self.transport = create_transport(host, connect_timeout, ssl)
  File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 281, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 85, in __init__
    raise socket.error, msg
error: [Errno 111] Connection refused


I'm using redis, and if I install rabbitmq I get another error, but I do not understand this right now --the broker should be redis but its isn't finding it or what? Can anyone give me more of a clue what is going on here? Do I need to import something else, etc. The point is, there is very little beyond the bare bones example and this makes no sense to me.

The most I've been able to determine as that in the Api module there is no access to the 'celery' and when it goes to try and put data there when at the app level, the celery there falls into some defaults, which aren't installed because I'm pointing to redis. Just a guess. I haven't been able to import information into the module, only determined that calling anything 'celery'(for example, output celery.conf) from the app causes an error -- although I could import celery.task.

This is the broker config the application is using, direct from the example:

BROKER_URL = 'redis://localhost:6379/0'
CELERY_REDIS_HOST = "localhost"


If you'd like to see a demo: https://github.com/thrisp/flask-celery-example

AS it turns out having BROKER_TRANSPORT = 'redis' in your settings is important for whatever is being passed that I'm passing in (for the setup I put forth here and in the git example), I'm not entirely sure why it isn't in the example bits, but is in the ones I've added but it is -- without this it wants to dump everything onto a default ampq queue.


Also, this a rather a big deal, using the upcoming version of Celery simplifies 10,000 issues when using it with Flask, making all of this unecesssary.

6/6/2012 11:52:28 PM

You must configure redis to bind the localhost. In /etc/redis/redis.conf, uncomment the line with

9/12/2012 4:37:54 PM

Licensed under: CC-BY-SA with attribution
Not affiliated with: Stack Overflow