gcp pubsub - ghdrako/doc_snipets GitHub Wiki

Pub/Sub Concepts

  • Publishers
  • Message attributes Optional pieces of information that the publisher may send along with the actual message to the topic and subscribers. A message attribute is a key-value pair. Developers can also use custom attributes when publishing messages.

Important note If a message itself has no data then it must contain a message attribute (either a message attribute or non-empty data is required for a message to be published).

  • Topics
  • Subscriptions

There are two ways in which a subscriber can receive messages from the subscription. First, the service can push the message to the subscriber, known as the push method. Second, the subscriber application can pull the message from Cloud Pub/Sub (that is, from the subscription itself). Finally, a pull subscription acknowledgment is in the form of receiving a HTTP 200 status, unlike with a pull model, where subscribers need to acknowledge them explicitly.

Pub/Sub model – fan-in and fan-out

  • fan-in design - When multiple message sources or publishers send messages to a single consumer or subscriber
  • fan-out desig - single publisher sends messages to multiple subscribers

Client library eight available options (Python, C++, C#, Go, Java, Node.js, PHP, and Ruby).

Cloud Pub/Sub has a maximum message size of 10 megabytes (MB) with no upper limit on maximum delivery rate

gcloud services enable pubsub.googleapis.com
gcloud pubsub topics create myTopic
gcloud pubsub topics list
gcloud pubsub topics delete Test1
gcloud  pubsub subscriptions create --topic myTopic mySubscription
gcloud pubsub topics list-subscriptions myTopic
gcloud pubsub subscriptions delete Test1
gcloud pubsub topics publish myTopic --message "Hello"
gcloud pubsub subscriptions pull mySubscription --auto-ack


Using the pull command without any flags will output only one message, even if you are subscribed to a topic that has more held in it. Once an individual message has been outputted from a particular subscription-based pull command, you cannot access that message again with the pull command.

gcloud pubsub topics publish myTopic --message "Publisher is starting to get the hang of Pub/Sub"
gcloud pubsub topics publish myTopic --message "Publisher wonders if all messages will be pulled"
gcloud pubsub topics publish myTopic --message "Publisher will have to test to find out"

gcloud pubsub subscriptions pull mySubscription --auto-ack --limit=3
  • --auto-ack part of the pull command is a flag that has been formatting your messages into the neat boxes that you see your pulled messages in.

  • limit is another flag that sets an upper limit on the number of messages to pull.

gcloud pubsub topics add-iam-policy-binding iot-topic \
--member=serviceAccount:[email protected] --role='roles/pubsub.publisher'

Schemas

A Pub/Sub schema defines the names and data types of fields in a message's data field. You can create schemas as standalone versioned resources, associate schemas with multiple Pub/Sub topics, and use them to validate the structure of published messages.

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

Schema types

You can create a schema using the following frameworks: * Apache Avro * Protocol Buffer

#Avro
{
 "type" : "record",
 "name" : "Avro",
 "fields" : [
   {
     "name" : "StringField",
     "type" : "string"
   },
   {
     "name" : "FloatField",
     "type" : "float"
   },
   {
     "name" : "BooleanField",
     "type" : "boolean"
   },
 ]
}
# Protocol Buffer
syntax = "proto3";
message ProtocolBuffer {
  string string_field = 1;
  float float_field = 2;
  bool boolean_field = 3;
}
gcloud pubsub schemas describe SCHEMA_NAME # Getting schema details
gcloud pubsub schemas list # Listing schemas
gcloud pubsub schemas delete SCHEMA_NAME # Deleting schemas
# Validating message schemas
gcloud pubsub schemas validate-message \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION \
        --message-encoding=MESSAGE_ENCODING \
        --message=MESSAGE
# validate a message against an existing schema
gcloud pubsub schemas validate-message \
        --message-encoding=MESSAGE_ENCODING \
        --message=MESSAGE \
        --schema-name=SCHEMA_NAME

Create a topic

gcloud pubsub topics create my-topic

Add a subscription

gcloud pubsub subscriptions \
    create my-sub --topic my-topic \
    --ack-deadline=60

List topics and subscriptions

gcloud pubsub topics list
gcloud pubsub subscriptions list

Publish messages to the topic

gcloud pubsub topics publish my-topic --message hello
gcloud pubsub topics publish my-topic --message goodbye

When you successfully publish a message, you should see the messageId returned from the server. This is a unique id automatically assigned by the server to each message.

Pull messages from the subscription

gcloud pubsub subscriptions \
    pull --auto-ack --limit=2 my-sub

Note: Cloud Pub/Sub doesn't guarantee the order of the messages. It is also possible that you saw only one message. In that case, try running the same command several times until you see the other message.

Acknowledge and acknowledgement deadline

After you pull a message and correctly process it, you must notify Cloud Pub/Sub that you successfully received the message. This action is called acknowledge.

You may have noticed the --auto-ack flag passed along with the pull command. The --auto-ack flag automatically pulls the message and acknowledges it.

Manual acknowledgement

gcloud pubsub \
    topics publish my-topic --message thanks  # Send a new message
gcloud pubsub subscriptions \
    pull my-sub                               # Pull messages again

This should display the thanks message, as well as MESSAGE_ID, and ACK_ID. The ACK_ID is another id that you can use for acknowledging the message.

Acknowledge the message

After you pull a message, you need to acknowledge the message before the acknowledgement deadline has passed. For example, if a subscription is configured to have a 60 seconds acknowledgement deadline, as we did in this tutorial, you need to acknowledge the message within 60 seconds after you pulled the message. Otherwise, Cloud Pub/Sub will re-send the message.

gcloud pubsub subscriptions ack \
    my-sub --ack-ids ACK_ID
gcloud beta pubsub subscriptions pull MY_SUB --format="json(ackId, message.attributes, message.data.decode(\"base64\").decode(\"utf-8\"), message.messageId, message.publishTime)"

Access Control

In Pub/Sub, access control can be configured at the project level and at the individual resource level (Grant access on a per-topic or per-subscription basis)

Get subscription/topic policy

gcloud pubsub subscriptions get-iam-policy \
   projects/${PROJECT}/subscriptions/${SUBSCRIPTION} \
   --format json

gcloud pubsub topics get-iam-policy \
    projects/${PROJECT}/topics/${TOPIC} \
    --format json
## save policy to file
gcloud pubsub subscriptions get-iam-policy \
   projects/${PROJECT}/subscriptions/${SUBSCRIPTION} \
   --format json > subscription_policy.json

Apply policy

gcloud pubsub subscriptions set-iam-policy \
  projects/${PROJECT}/subscriptions/${SUBSCRIPTION} \
  subscription_policy.json

gcloud pubsub topics set-iam-policy  \
   projects/${PROJECT}/topics/${TOPIC}     \
   topic_policy.json

Test policy

gcloud iam list-testable-permissions \
   https://pubsub.googleapis.com/v1/projects/${PROJECT}/subscriptions/${SUBSCRIPTION} \
   --format json

gcloud iam list-testable-permissions \
   https://pubsub.googleapis.com/v1/projects/${PROJECT}/topics/${TOPIC} \
   --format json

dead-tetter Topic

PUBSUB_SERVICE_ACCOUNT="service-${project-number}@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub topics add-iam-policy-binding dead-letter-topic-id \
    --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
    --role="roles/pubsub.publisher"

And then, the subscriber role:

PUBSUB_SERVICE_ACCOUNT="service-${project-number}@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub subscriptions add-iam-policy-binding subscription-id \
    --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
    --role="roles/pubsub.subscriber"

Wyczyszczenie subskrypcji or replay seek snapshot

gcloud pubsub subscriptions seek  <subscription_name> --time=$(date +%Y-%m-%dT%H:%M:%S) --project <project_name>

message retention

By default, a Pub/Sub topic discards messages as soon as they are acknowledged by all subscriptions attached to the topić. Configuring a topic with message retention gives you more flexibility, allowing any subscription attached to the topic to seek back in time and replay previously acknowledged messages up to the topic's message_retention_duration

gcloud pubsub topics create TOPIC_ID --message-retention-duration=7d
gcloud pubsub topics update TOPIC_ID --message-retention-duration=1d
gcloud pubsub topics update TOPIC_ID --clear-message-retention-duration

subscription message retention

By default, Pub/Sub discards a message from a subscription as soon as the message is acknowledged. Unacknowledged messages are retained for a default of 7 days (configurable by the subscription's message_retention_duration property).

Messages can be retained in a subscription for more than 7 days if the message retention duration configured on its topic is greater than 7 days.

gcloud pubsub subscriptions create SUBSCRIPTION_ID
    --retain-acked-messages=true
    --message-retention-duration=5d

Note: If you omit the --message-retention-duration parameter, acked and unacked messages will be retained for the default duration of 7 days.

gcloud pubsub subscriptions update SUBSCRIPTION_ID --message-retention-duration=1d
gcloud pubsub subscriptions update SUBSCRIPTION_ID --no-retain-acked-messages

Seek to a timestamp

You can perform the following types of seek operations based on timestamps:

  • To purge all messages, you can seek to a time in the future.
  • To replay and reprocess previously acknowledged messages, seek to a time in the past.
gcloud pubsub subscriptions seek --time=TIME

Snapshot

For example, you might create a snapshot when deploying new subscriber code, in case you need to recover from unexpected or erroneous acknowledgements.

gcloud pubsub snapshot create \
    --project=PROJECT_ID \
    --subscription=SUBSCRIPTION_ID \
    --name=SNAPSHOT_ID

gcloud pubsub subscriptions seek \
    --snapshot=SNAPSHOT \

Ordering FIFO funcionality

ordering key, which allows you to ensure that messages inside an ordering key are processed in sequence by not giving you the next message until the previous one has been acknowledged.

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

Cloud Storage subscription

You can now create Cloud Storage subscriptions in Pub/Sub to write messages directly to an existing Cloud Storage bucket.

BigQuery subscription

Exacly one delivery

Kafka connector