File on Arrival Mechanism for GCS
When a file is uploaded to the GCS bucket, it can trigger a notification to the specified Pub/Sub topic. This is done by configuring notification settings on the bucket to send messages for specific events (like OBJECT_FINALIZE, which indicates a new object has been created).
Subscribers to the Pub/Sub topic will receive messages containing details about the uploaded file, which can then be processed accordingly.
You can set up a file-on-arrival mechanism using Google Cloud Pub/Sub with Google Cloud Storage (GCS). Here is a breakdown of the steps and considerations involved:
Prepare Pub/Sub service
Service Availability: Ensure that the Pub/Sub service is already set up in your GCP project.
Configuration Override:
- You need to modify the configuration files located in the cloudMessageDispatcher folder.
- Use GcpBucketEvent.json to specify your GCP Subscription.
- Use GcpBucketRegistration.json to specify your GCP Topic.
- Permissions:
- Ensure that the GCP credentials you are using have the necessary roles:
- GCS Bucket Admin Role: This allows you to manage the GCS bucket.
- GCP Pub/Sub Admin Role: This grants permissions to manage Pub/Sub topics and subscriptions.
Key considerations
- Event Types — Make sure to configure the bucket to listen for the right events.
- Testing — After setting everything up, conduct tests to ensure that your app receives notifications as expected.
Handle incoming files
Set up the C3 AI infrastructure to handle incoming files in your GCS bucket. By defining the FileSourceSystem and FileSourceCollection, you ensure that your application can effectively respond to file uploads and trigger any necessary processing workflows.
If the FileSourceSystem and FileSourceCollection are already set up, you can skip the creation steps. However, if you need to enable or change the directory for the onArrival mode, you can update the existing configuration directly rather than creating a new collection. Use the method FileSourceCollection.forName() to access your existing collection. This allows you to work with the already established setup without unnecessary duplication.
Start onArrival mode
In this step, you initiate the onArrival mode for your FileSourceCollection (fsc) by running fsc.startOnArrivalMode(). This function internally calls FileSystem.registerNotification(ths.inboxUrl()) and registers the specified GCS bucket or directory to listen for file changes.
Once registered, any changes like file uploads within the specified directory triggers a series of events. These events will be sent to the Pub/Sub topic defined in your GcpBucketRegistration.json file.
By starting the onArrival mode, your application can react to file uploads in real time, allowing for immediate processing or workflows based on the new files. This setup ensures that your file changes are communicated effectively through Google Cloud Pub/Sub, enabling scalable and decoupled processing of events.
This step is crucial for automating workflows that depend on file uploads, making your application more responsive and efficient.
Register CloudInboundMessage
In this step, you register a CloudInboundMessage to facilitate the flow of messages from your Google Cloud Pub/Sub setup. Here is a detailed explanation of the process:
Overview of Pub/Sub data flow
- The data flow in Pub/Sub includes the following components:
- Publisher — The entity that sends messages (in this case, your GCS bucket)
- Topic — The channel through which messages are sent
- Subscription — The mechanism that allows subscribers to receive messages from a topic
- Subscriber — The entity that receives messages from the subscription
- When you call
fsc.startOnArrivalMode(), it marks the GCS bucket as a publisher. This means that any events (like file uploads) in the bucket automatically publish messages to the specified topic. - Once the topic receives messages, it is GCP's responsibility to manage how these messages are delivered to the subscription.
Registering the subscriber
To specify the subscriber and enable it to pull messages from the subscription, run the following command:
CloudInboundMessage.downcast(GcpBucketEvent.myType()).register();This line registers the specific type of message, in this case, GcpBucketEvent, that your application handles. By doing this, you are essentially notifying the FileSourceCollection to consume.
After registration, the CloudInvalidationQueue can periodically pull messages from the GCP subscription. This allows your application to handle incoming messages efficiently, ensuring that any events related to file uploads are processed as they occur.
Summary
This step is crucial for establishing a complete message handling system within your application. By registering the CloudInboundMessage, you ensure that your application can respond to events triggered by file uploads in your GCS bucket, enabling automated workflows and real-time processing.
Teardown
In this final step, you clean up your setup by stopping the onArrival mode and deregistering the associated resources. Here is how it works:
Stop onArrival mode
You initiate the teardown process by running fsc.stopOnArrivalMode(). This function deregisters the URL you provided in the FileSourceCollection. Essentially, it stops the GCS bucket from publishing events to the Pub/Sub topic, effectively halting any further notifications about file changes.
Clear the CloudInvalidationQueue
Next, you need to ensure that any pending messages or events are cleared from the queue by running CloudInboundMessage.downcast(GcpBucketEvent.myType()).register().
This command clears the CloudInvalidationQueue associated with the GcpBucketEvent. By registering this type, you ensure that the system is aware that it should no longer expect messages of this type, effectively cleaning up any lingering references or processes related to the bucket events.
This teardown step is crucial for maintaining a clean and efficient system. By stopping the onArrival mode and clearing the message queue, you prevent unnecessary processing and resource usage, ensuring that your application remains responsive and well-managed.