Subscribers

This document gives an overview of how subscriptions work in Google Cloud Pub/Sub. For details on pull and push delivery subscriptions, see the Pull Subscriber Guide and the Push Subscriber Guide.

To receive messages published to a topic, you must create a subscription to that topic. The subscription connects the topic to a subscriber application that receives and processes messages published to the topic. A topic can have multiple subscriptions, but a given subscription belongs to a single topic.

Delivery guarantee

Google Cloud Pub/Sub offers an "at-least-once delivery" guarantee: each published message is delivered to a subscriber at least once for every subscription.

The at-least-once delivery guarantee is not absolute: each subscription has a configurable maximum retention time for a message. A message that could not be delivered within that time is deleted and is no longer accessible. This typically happens when subscribers do not keep up with the flow of messages. A message published before a given subscription was created will usually not be delivered. Thus, messages published to a topic with no subscription cannot be retrieved until at least one subscription is created.

Once a message is sent to a subscriber, the subscriber must either acknowledge or drop the message. A message is considered outstanding once it has been sent out for delivery and before a subscriber acknowledges it. Google Cloud Pub/Sub will repeatedly attempt to deliver any message that has not been acknowledged or that is not outstanding. Google Cloud Pub/Sub usually does not redeliver acknowledged messages. A subscriber has a configurable, limited amount of time, orackDeadline, to acknowledge the message. Once the deadline has passed, an outstanding message becomes unacknowledged.

Typically, Pub/Sub delivers each message once and in the order in which it was published. However, messages may sometimes be delivered out of order or more than once. Accommodating more than one delivery requires your subscriber to be idempotent when processing messages. If ordering is important, we recommend that the publisher of the topic to which you subscribe include a sequence token in the message. See Message Ordering for more information.

Push and Pull subscriptions

A subscription can use either the push or pull mechanism for message delivery. You can change or configure the mechanism at any time. In push delivery, Pub/Sub initiates requests to your subscriber application to deliver messages. In pull delivery, your subscriber application initiates requests to the Pub/Sub server to retrieve messages.

  • For a push subscription, the Pub/Sub server sends each message as an HTTPs request to the subscriber application at a pre-configured endpoint. The endpoint acknowledges the message by returning an HTTP success status code. This indicates that the message has been successfully processed and the Pub/Sub system can delete it from the subscription. A non-success response indicates that the message should be resent. Pub/Sub dynamically adjusts the rate of push requests, based on the rate at which it receives success responses.

  • In a pull subscription, the subscribing application explicitly calls thepullmethod, which requests delivery of a message in the subscription queue. The Pub/Sub server responds with the message (or an error if the queue is empty), and an ack ID. The subscriber then explicitly calls the acknowledgemethod, using the returned ack ID, to acknowledge receipt.

The following table offers some guidance in choosing the appropriate delivery mechanism for your application:

Pull Push
Large volume of messages (many more than 1/second). Subscribers with low traffic (less than 10,000/second).
Efficiency and throughput of message processing is critical. Legacy push webhooks.
Public HTTPs endpoint, with non-self-signed SSL certificate, is not feasible to set up. App Engine subscribers

The following table compares pull and push delivery:

Pull Push
Endpoints Any device on the internet is able to call the Pub/Sub API (and in possession of authorized credentials). An HTTPS server with non-self-signed certificate accessible on the public web. The receiving endpoint may be decoupled from the Pub/Sub subscription, so that messages from multiple subscriptions may be sent to a single endpoint.
Load balancing Multiple subscribers can make pull calls to the same "shared" subscription. Each subscriber will receive a subset of the messages. The push endpoint can be a load balancer.
Configuration No configuration is necessary. No configuration is necessary for App Engine apps in the same project as the subscriber. Configuration (and verification) of push endpoints is required in the Google Cloud Platform Console for all other endpoints. Endpoints must be reachable via DNS names and have SSL certificates installed.
Latency Delays will occur between message publication and delivery. Delivery is immediate (except when delivery is being rate-limited to avoid overwhelming the endpoint). There is no added latency from pull requests.
Message handling and flow control Message acknowledgment is explicit. The subscriber client controls the rate of delivery. The subscriber can dynamically modify the ack deadline, allowing message processing to be arbitrarily long. The Pub/Sub server automatically implements flow control. There is no need to handle message flow at the client side (although it is possible to indicate that the client cannot handle the current message load by passing back an HTTP error).
Efficiency and throughput Achieves high throughput at low CPU and bandwidth by allowing batched delivery and acknowledgments as well as massively parallel consumption. May be inefficient if aggressive polling is used to minimize message delivery time. Delivers one message per request and limits maximum number of outstanding messages.

Creating and configuring subscriptions

A subscription is created for a single topic. It has several properties that can be set at creation time or updated later, including:

  • Delivery method: By default, Pub/Sub subscriptions use the pull method. You can switch to push delivery by specifying a non-empty, valid HTTPs push endpoint URL. You can switch back to pull delivery by specifying an empty URL.
  • An acknowledgment deadline: If your code doesn't acknowledge the message before the deadline, the message is sent again. The default is 10 seconds. The maximum custom deadline you can specify is 600 seconds (10 minutes).
  • Maximum message retention duration: The maximum amount of time for which Google Cloud Pub/Sub will retain and attempt to deliver a message. Default and maximum values are seven days. Ten minutes is the minimum.

REST API

PUT https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription
{
  "topic": "projects/someproject/topics/sometopic"
  // Only needed if you are using push delivery
  "pushConfig": {
    "pushEndpoint": "https://myproject.appspot.com/myhandler"
  }
}

-- Response:
200 OK
{
  "name": "projects/myproject/subscriptions/mysubscription",
  "topic": "projects/someproject/topics/sometopic",
  "pushConfig": {
    "pushEndpoint": "https://myproject.appspot.com/myhandler"
  },
  "ackDeadlineSeconds": 10
}

Java

try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
  // eg. projectId = "my-test-project", topicId = "my-test-topic"
  TopicName topicName = TopicName.create(projectId, topicId);
  // eg. subscriptionId = "my-test-subscription"
  SubscriptionName subscriptionName =
      SubscriptionName.create(projectId, subscriptionId);
  // create a pull subscription with default acknowledgement deadline
  Subscription subscription =
      subscriptionAdminClient.createSubscription(
          subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
  return subscription;
}

Python

def create_subscription(topic_name, subscription_name):
    """Create a new pull subscription on the given topic."""
    pubsub_client = pubsub.Client()
    topic = pubsub_client.topic(topic_name)

    subscription = topic.subscription(subscription_name)
    subscription.create()

    print('Subscription {} created on topic {}.'.format(
        subscription.name, topic.name))

Lifecycle of a Subscription

Subscriptions with no activity (push successes or pull requests) for 31 days may be deleted automatically. You can also delete a subscription manually. Although you can create a new subscription with the same name as a deleted one, the delivery guarantee applies to each subscription independent of its name. In other words, the new subscription has no relation to the old one, even though they have the same name. Therefore, the new subscription has no backlog at the time it is created (no messages waiting for delivery), even if the deleted subscription had a large number of unacknowledged messages.

Pulling messages from a subscription

For pull subscriptions, delivery is accomplished in two steps:

  1. Call the pullmethod to retrieve some number of unacknowledged messages. You can specify the max number of messages returned by one call by setting the maxMessagesfield. By default, the server keeps the connection open until at least one message is received. You can optionally set the returnImmediatelyfield to trueto prevent the subscriber from waiting if the queue is currently empty.
  2. Call the acknowledgemethod to indicate that the subscriber has finished processing the message and that the Pub/Sub server can delete it from the subscription queue. You can also optionally modify the ack deadline on a message to delay the acknowledgment.

REST API

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}


-- Response:
200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

-- Response:
200 OK

Java

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver = new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Python

def receive_message(topic_name, subscription_name):
    """Receives a message from a pull subscription."""
    pubsub_client = pubsub.Client()
    topic = pubsub_client.topic(topic_name)
    subscription = topic.subscription(subscription_name)

    # Change return_immediately=False to block until messages are
    # received.
    results = subscription.pull(return_immediately=True)

    print('Received {} messages.'.format(len(results)))

    for ack_id, message in results:
        print('* {}: {}, {}'.format(
            message.message_id, message.data, message.attributes))

    # Acknowledge received messages. If you do not acknowledge, Pub/Sub will
    # redeliver the message.
    if results:
        subscription.acknowledge([ack_id for ack_id, message in results])

Pushing messages to subscribers

The Pub/Sub server sends any messages for your subscription to the webhook address you have configured. Your webhook application needs to handle incoming messages and return an HTTP status code to indicate success. Any of the following HTTP status codes is interpreted as success by the Pub/Sub system:200,201,204, or102. If your service returns any other code, Google Cloud Pub/Sub retries delivering the message for up to the maximum retention time set for the subscription.

To regulate the rate of push message delivery, Google Cloud Pub/Sub uses a slow-start algorithm. With slow-start, Google Cloud Pub/Sub starts by sending a single message at a time, and doubles up with each successful delivery, until it reaches the maximum number of concurrent messages outstanding. Any time there is a delivery failure, the number of messages outstanding allowed for the subscription is halved.

The specifics of coding your handler are dependent on your environment. Below is a protocol example for receiving push messages. Note that themessage.datafield is base64-encoded.

POST https://www.example.com/my-push-endpoint

{
  "message": {
    "attributes": {
      "key": "value"
    },
    "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
    "message_id": "136969346945"
  },
  "subscription": "projects/myproject/subscriptions/mysubscription"
}

-- Response
204 No Content

A response with 204 status code is considered as an implicit acknowledgement.

Stopping and Resuming Delivery

To suspend receiving messages for a subscription, set the push endpoint URL to an empty string, effectively converting the subscription to a pull one. The messages will accumulate, but will not be delivered. To resume receiving messages, set the URL to a valid endpoint again. You can do this with the subscription update method of thegcloudcommand-line tool, for example.

To permanently stop delivery, delete the subscription.

Configuring HTTP Endpoints

You need a publicly accessible HTTPS server to handle POST requests in order to receive push messages. The server must present a valid SSL certificate signed by a certificate authority and routable by DNS. You also need to validate that you own the domain (or have equivalent access to the endpoint). Finally, you must register the endpoint domain with the GCP project. Note that these steps are considerably simplified on App Engine, where SSL certificates are provided and verification requirements can be relaxed.

Message Ordering

Google Cloud Pub/Sub provides a highly-available, scalable message delivery service. The tradeoff for having these properties is that the order in which messages are received by subscribers is not guaranteed. While the lack of ordering may sound burdensome, there are very few use cases that actually require strict ordering.

This document includes an overview of what order of messages really means and its tradeoffs, as well as a discussion of use cases and techniques for dealing with order-dependent messaging in your current workflow when moving to Google Cloud Pub/Sub.

What is ordering?

Defining message order can be complicated, depending on your publishers and subscribers. First of all, it is possible you have multiple subscribers processing messages on a single subscription:

In this case, even if messages come through the subscription in order, there are no guarantees of the order in which the messages will be processed by your subscribers. If order of processing is important, then subscribers would need to coordinate through some ACID storage system such as Cloud Datastore or Cloud SQL.

Similarly, multiple publishers on the same topic can make order difficult:

How do you assign an order to messages published from different publishers? Either the publishers themselves have to coordinate, or the message delivery service itself has to attach a notion of order to every incoming message. Each message would need to include the ordering information. The order information could be a timestamp (though it has to be a timestamp that all servers get from the same source in order to avoid issues of clock drift), or a sequence number (acquired from a single source with ACID guarantees). Other messaging systems that guarantee ordering of messages require settings that effectively limit the system to multiple publishers sending messages through a single server to a single subscriber.

The Message Delivery Service Node, Expanded

If the abstract message delivery service used in the examples above were a single, synchronous server, then the service itself could guarantee order. However, a message delivery service such as Google Cloud Pub/Sub is not a single server, neither in terms of server roles nor in terms of number of servers. In fact, there are layers between your publishers and subscribers and the Google Cloud Pub/Sub system itself. Here is a more detailed diagram of what is going on when Google Cloud Pub/Sub is the message delivery system:


As you can see, there are many paths a single message can take to get from the publisher to the subscriber. The benefit of such an architecture is that it is highly available (no single server outage results in systemwide delays) and scalable (messages can be distributed across many servers to maximize throughput). The benefits of distributed systems like this have been instrumental to Google products such as Search, Ads, and GMail that build on top of the same systems that run Google Cloud Pub/Sub.

How Should I Handle Order?

Hopefully, you now understand why order of messages is fairly complex and why Google Cloud Pub/Sub de-emphasizes the need for order. To attain availability and scalability, it is important that you minimize your reliance on order. The reliance on order can take several forms, each described below with some typical use cases and solutions.

When Order does not matter at all
Typical Use Cases: Queue of independent tasks, collection of statistics on events

Use cases where order does not matter at all are perfect for Google Cloud Pub/Sub. For example, if you have independent tasks that need to be performed by your subscribers, each task is a message where the subscriber that receives the message performs the action. As another example, if you want to collect statistics on all actions taken by clients on your server, you can publish a message for each event and then have your subscribers collate messages and update results in persistent storage.

When Only Order in the final result matters
Typical Use Cases: Logs, state updates

In use cases in this category, the order in which messages are processed does not matter; all that matters is that the end result is ordered properly. For example, consider a collated log that is processed and stored to disk. The log events come from multiple publishers. In this case, the actual order in which log events are processed does not matter; all that matters is that the end result can be accessed in a time-sorted manner. Therefore, you could attach a timestamp to every event in the publisher and make the subscriber store the messages in some underlying data store (such as Cloud Datastore) that allows storage or retrieval by the sorted timestamp.

The same option works for state updates that require access to only the most recent state. For example, consider keeping track of current prices for different stocks where one does not care about history, only the most recent value. You could attach a timestamp to each stock tick and only store ones that are more recent than the currently-stored value.

Order of processed messages matters
Typical Use Cases: Transactional data where thresholds must be enforced

Complete dependence on the order in which messages are processed is the most complicated case. Any solution that enforces strict ordering of messages is going to come at the expense of performance and throughput. You should only depend on order when it is absolutely necessary, and when you are sure that you won’t need to scale to a large number of messages per second. In order to process messages in order, a subscriber must either:

  • Know the entire list of outstanding messages and the order in which they must be processed, or

  • Have a way to determine from all messages it has currently received whether or not there are messages it has not yet received that it needs to process first.

You could implement the first option by assigning each message a unique identifier and storing in some persistent place (such as Cloud Datastore) the order in which messages should be processed. A subscriber would check the persistent storage to know the next message it must process and ensure that it only processes that message next, waiting to process other messages it has received when they come up in the complete ordering. At that point, it is worth considering using the persistent storage itself as the message queue and not relying on message delivering through Google Cloud Pub/Sub.

The latter is possible by using Cloud Monitoring to keep track of thepubsub.googleapis.com/subscription/oldest_unacked_message_agemetric (see Supported Metrics for a description). A subscriber would temporarily put all messages in some persistent storage and ack the messages. It would periodically check the oldest unacked message age and check against the publish timestamps of the messages in storage. All messages published before the oldest unacked message are guaranteed to have been received, so those messages can be removed from persistent storage and processed in order.

Alternatively, if you have a single, synchronous publisher and a single subscriber, you could use a sequence number to ensure ordering. This approach requires the use of a persistent counter.

Performance and Scaling

Pulling and acknowledging one message at a time can be inefficient. Typically, multiple messages are returned in a single pull response, and multiple ack IDs are sent in a single acknowledge request. Further, to maximize throughput, many pull calls are executed asynchronously and in parallel. You can achieve the right balance of throughput, efficiency, and latency by adjusting the number of messages or ack IDs per request and by using simultaneous pull requests. Different client libraries offer different means of configuring these parameters.

If you must use synchronous pull calls, you have to decide at what frequency to poll, and whether to wait for messages to arrive before returning an empty response. Higher polling frequency with thereturnImmediatelyoption minimizes message latency at the expense of requests that return no messages. Waiting for messages to arrive blocks the thread executing the message. This is particularly problematic on App Engine 1.9, where we recommend push subscribers. Allowing for a high number of messages to be returned maximizes the efficiency of processing.

You may need to implement a scaling mechanism for your subscriber application to keep up with message volume. How to do this depends on your environment, but it will generally be based on backlog metrics offered through the Stackdriver Monitoring service. For details on how to do this for Google Compute Engine, see Scaling based on Cloud Monitoring Metrics.

results matching ""

    No results matching ""