Our AWS Simple Storage Service (S3) destination allows you to create a data lake simply and inexpensively following the concept of separating storage from processing, common in databases.
By creating S3 as a destination at Kondado, you can ETL (or ELT) data from over 70 sources and use virtualization technologies such as presto, athena, dremio and Redshift Spectrum to perform queries on your data.
As in S3 there is a separation between processing and storage, our models do not apply to this destination. Additionally, data aggregation and deduplication (UPSERT) operations must be done by you, either in the virtualization layer or with scripts such as AWS Glue. To help you with this process, we added to this destination the possibility for you to send notifications to URLs that will trigger these processes.
Below we will explain in detail how our S3 destination works and how to add it to the Kondado platform.
1. Creation of the access key
The access key created must have the following permissions:
- s3:PutObject
- s3:GetObject
- s3:ListBucket
- s3:DeleteObject
You can use the policy below as an example to give access to a specific bucket. Remember to edit the variable “CHANGE-THIS-WITH-BUCKET-NAME” with the actual name of your bucket.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "kondadorecommendeds3destpolicy20220111",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:DeleteObject",
"s3:ListMultipartUploadParts",
"s3:PutObject",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::CHANGE-THIS-WITH-BUCKET-NAME/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket",
"s3:ListBucketMultipartUploads"
],
"Resource": [
"arn:aws:s3:::CHANGE-THIS-WITH-BUCKET-NAME"
]
}
]
}
- In Key Id and Key Secret, fill in the previously generated key information
- In Bucket, just fill in the name of your existing bucket, without any URL formatting. For example, “my-bucket” is correct and “s3://my-bucket” is wrong. Also remember to use only the root name of the bucket, no child folders. For example, "my-bucket/my-folder" is wrong - if you want to put an additional folder, do so by filling in the table name in the integration
2. Output file format
We currently send files in the following formats:
2.1 JSON (jsonl)
This format saves data in JSON format, one on each line of the file, with quotes escaped by \. This is an example of a jsonl file that will be uploaded to your data lake:
{"\"keyword\"":"y","\"url\"":"https:\/\/mailchi.mp\/a37895aff5ac\/subscribe_newsletter","\"title\"":"https:\/\/mailchi.mp\/a37895aff5ac\/subscribe_newsletter","\"timestamp\"":1572559835000,"\"clicks\"":1}
{"\"keyword\"":"z","\"url\"":"https:\/\/pipedrivewebforms.com\/form\/43fd725e54683d565a44ed4872c3560f4639452","\"title\"":"Kondado - contato","\"timestamp\"":1572560617000,"\"clicks\"":2}
If you do not apply compression, the files will be saved with the extension “.json”
2.2 CSV
The CSV format will save files in this traditional format and with a “.csv” extension if you haven't provided any compression method. You will need to provide some additional file formatting parameters:
2.2.1 Column delimiter
Possible values for the column delimiter are
- Comma (",")
- Pipe("|")
- Tab("\t")
2.2.2 Quote
The quote parameter indicates how each column in a line of the file will be wrapped in quotes. Possible options are a 1 x 1 mapping with python's CSV module
- All
- minimal
- non-numeric
- None
We recommend the “All” format.
As you may have noticed, quote and the column delimiter can conflict with your content if these characters are also present in the values. So, all saved files have a default escape character for conflicting values, the backslash (“\”).
2.2.3 Header
You can also choose whether you want the uploaded files to come with a header or not. Remembering that, if you register a URL of notifications, sending a header becomes unnecessary, as we will send you the schema of the object, with the order of the columns according to how they were written in the file. Column order will also obey the same display order in our UI when you click on “DETAILS” of a given integration.
2.3 Parquet
Saves data in the parquet format. If compression is not used, the file extension will be .parquet
3. Compression
You can choose to apply a compression method to your files or not. The available compression methods will change the extension of saved files.
3.1 GZIP
Files compressed with the gzip method will receive the extension “.gz”
4. Notification URL
If the “Notify any URL” parameter is filled in, you will need to enter a valid URL in “URL for notifications”. This URL must be enabled to receive posts.
4.1 When will the URL notification occur?
At the end of an integration, for each object integrated and for each operation, we will make a post at the specified URL:
- “for each object” means that pipelines that populate more than one table will notify per table
- “for each operation” means that the send operations to the full tables and to the delta tables (if you chose to keep deltas) will make separate notifications
4.2 What if the URL fails?
We will consider as success a notification for the given URL that returns a 200 response code within 30 seconds. If the answer is different from this or takes longer, we will perform 3 more attempts and point out an error in the integration if the failure persists. However, files in full and deltas folders/prefixes will not be deleted
4.3 Payload Sent
With the data that will be sent to your notification URL, you will have all the necessary information to trigger further processes to handle the sent files. The default JSON payload fields are as follows:
4.3.1 pipeline_id
Id of the integration that generated the files. The id of an pipeline can be obtained through the URL of the Kondado pipeline in your browser: https://app.kondado.com.br/pipelines/pipeline_id
Example:
"pipeline_id":231923
4.3.2 bucket
Bucket where the files were uploaded. Corresponds to the bucket registered by you in the destination parameters at the time the integration saved the files
Example:
"bucket":"my-kdd-data-lake"
4.3.3 kdd_operation
Operation that triggered the notification. Possible values are:
- merge_deltas: indicates that this operation means sending data to the delta table
- merge_full: indicates that this operation means sending data to the full table
Example:
"kdd_operation":"merge_deltas"
4.3.4 kdd_replication_type
The replication type of this integration (full/incremental). For incremental merges, you should “upsert” the file values with your main file. For full integrations, new uploaded files must replace the previous core file.
The possible values of this parameter are:
- drop: indicates that the integration is of the integral type
- savepoint: indicates that the integration is of the incremental type
This parameter is sent only when kdd_operation=merge_full.
Example:
"kdd_replication_type":"savepoint"
4.3.5 kdd_replication_keys
If your integration is incremental (kdd_replication_type=savepoint), then this field will contain the fields (comma separated) that you should use to upsert with the main file.
Example:
"kdd_replication_keys":"lead_email,campaign_id"
4.3.6 schema
Array of jsons with the fields sent to the files, in the order they appear (useful for CSV and also for your main aggregator file to have a consistent typing with your data).
In this array, each json will correspond to a field in the destination file and will contain the following information:
db_name: name of the field in the destination. This is the value you should use to create your main file.
type: field typing, according to Kondado standardization. The field types that are currently supported by this target are:
- text
- timestamp
- gives you
- float
- int
- boolean
- team
key: field name according to Kondado connector/CMS. For the purposes of aggregating the files into a main file, this field can be ignored, we are sending it for clarification purposes only
Example:
"schema":[
{
"db_name":"lead_id",
"type":"text",
"key":"id"
},
{
"db_name":"etapa",
"type":"text",
"key":"Etapas__PerguntasRespostas->Etapa"
},
{
"db_name":"pergunta",
"type":"text",
"key":"Etapas__PerguntasRespostas->Pergunta"
}
]
4.3.7 files
Array with available file names and their location (prefix). For example:
"files":[
"page_hits_deltas/20201030220519913118.gz",
"page_hits_deltas/20201030220533013600.gz"
]
4.3.8 file_format
Final file format created. Remember that, if the file undergoes compression, the format will not correspond to its extension, but the file “inside” the compressed file will be in this format. Example:
"file_format":"json"
4.3.9 file_format_params
If the selected file format requires additional formatting parameters (for example, CSV) this field will bring a JSON with the parameters used to create the file.
Example (CSV):
"file_format_params":{
"csv_delimiter":"pipe",
"csv_quote":"all",
"csv_include_header":"yes"
}
4.3.10 file_compression
This parameter indicates the compression applied to the output file. If no compression has been applied, its value will be “no”.
Example:
"file_compression":"gzip"
4.3.11 kdd_row_count
This parameter indicates how many records/lines there are in total in the uploaded files.
Example:
"kdd_row_count":29173
5. Prefix
The name/final part of the file will always be the timestamp in which the integration was performed + extension, in this format: [YYYYMMDDhhmmssfffff] [.] [extension]
In addition to this fixed name, with this prefix parameter, you can add variables that will help organize your bucket, they are:
- [exec_year]: Year (YYYY) the integration was executed
- [exec_month]: Month (MM) of execution of the integration
- [exec_day]: Day of the month (DD) the integration was executed
- [exec_hour]: Time (hh 24) the integration was executed
- [exec_minute]: Minute (mm) of execution of the integration
- [exec_second]: Seconds (ss) of integration execution
- [exec_millisec]: Milliseconds (fffffff) of execution of integration
These variables can be combined with free text and will be concatenated in front of the file name during insertion into S3.
Attention: It is necessary to write the brackets – ] and [
Example
Consider that the following prefix has been filled in:
[exec_year]/month=[exec_month]/{day=[exec_day]}/
The final name of the file being this:
20210922161050851020.json
In this case, the name to be written in the destination table will be:
2021/month=09/{day=22}/20210922161050851020.json
Note that a slash “/” was used at the end of the prefix ([exec_year]/month=[exec_month]/{day=[exec_day]}/) – if this slash was not inserted, the file would be 2021/month=09/ {day=22}20210922161050851020.json which may not be what you want
6. Date Format
For date and timestamp type columns, you can specify the date format:
- ISO: values will be written in ISO 8601 format
- Epoch: Values will be written in unix epoch time format, in seconds
General inquiries
Tables or folders or prefixes?
When we send data to other databases, we call the table records “sets”. In the case of S3, the way to gather records that belong to the same category is by prefixes (commonly called “folders”). To maintain consistency with the rest of the targets we support, we'll continue to call these sets of records tables. This way, whenever you see “table” within Kondado referring to the S3 destination, understand it as “prefix” (or “folder”). So, when giving the name to a “full table” you will be saying what should be the prefix (folder) of the files created in “full” mode
If you wish, you can use “/” in the table names to create a subfolder structure.
The tables/folders are the “fixed” part of the files insertion, that is, the files will always be sent to the same bucket/folder path. The creation of “sub-folders” with the prefix parameter allows greater dynamism, being possible to specify different folders for years, months, days, times, etc., which will be changed according to the moment of execution of the pipeline.
File creation flow of an integration
When looking at your S3 bucket, you will come across several “folders” that have a similar nomenclature to this “kdd_xxxxxxx_staging”. Files with this prefix are currently being merged by Kondado. Once the source data reading is finished, these staging folders will contain all the files from this run. Kondado will then copy these files to the full and deltas tables – at the end of this copy, we will trigger the notification to the registered URL.
Before and after each integration, the staging tables will be emptied to receive new files. So, we ask you not to delete these staging tables, as their integration may fail or, worse, generate incomplete data.
On the Kondado platform, when a destination is a database, our platform handles all data de-duplication with each update and arrival of new data. However, in the case of S3, this flow is the responsibility of the user using the replication_keys fields. If you don't use the notification URL, these replication fields can be found in the connector's documentation, when accessing the relationship graph. They will be marked with a circle next to the field. In rare cases, it is possible for two similar files to be uploaded in the same run, so your deduplication flow should be able to handle this when checking new files for deduplication.