Working with Databricks and Azure Cosmos DB Analytical Store
Today I would like to share my experience working on one of Quasar Services client’s projects that required a connection to Azure Cosmos DB as a data source.
We will be using Azure Cosmos DB Analytical Store because of the obvious reason — we won’t affect transactional store (OLTP) performance running ETL processes.
So, we will be using Azure Cosmos DB Analytical Store (OLAP) and downstream systems will consume this data. In my case, the downstream system will be Databricks.
Detailed deep dive into Azure Cosmos DB Analytical Store is out of the scope of this post and you can find pretty good readings on the Microsft website (https://learn.microsoft.com/en-us/azure/cosmos-db/analytical-store-introduction) and in many other places.
I would like to focus in this post on what would be the best option to consume data from Azure Cosmos DB Analytical Store.
So first of all, the only way to access Azure Cosmos DB Analytical Store — using Azure Synapse (😲). You can use Change Feed from everywhere but Analytical Store just from Azure Synapse (Microsoft Really?)
So first thing let’s create a new Azure Synapse instance. Now we have 2 options for how we can expose OLAP data from Azure Cosmos DB Analytical Store :
- using serverless Apache Spark pool
- using serverless SQL Pool
The dedicated SQL Pool doesn’t have access to Azure Cosmos DB Analytical Store (😲)
Option 1 — Serverless Apache Spark pool (spoiler — doesn’t work)
All you need to do in order to fetch data from Azure Cosmos OLAP is the following:
cfg = {
“spark.cosmos.accountEndpoint” : cosmosEndpoint,
“spark.cosmos.accountKey” : cosmosMasterKey,
“spark.cosmos.database” : cosmosDatabaseName,
“spark.cosmos.container” : cosmosContainerName,
}
source_entity_df = spark.read.format(“cosmos.olap”).options(**cfg).load()
Or you can use Linked Service as well, but it’s not a point — the point is:
In case you have more than one object type with a different schema in the same container, you will get back the result data frame with the schema that is actually the product of all the different objects you have in the same container, and … there is no much you can do with this kind of data frame.
Let me explain, for instance, let’s assume, we got a container called order with two objects in it:
ice_cream = {
id: 1,
type: ‘ice_cream’
name: ‘ice_cream’,
flavor: ‘chocolate’
}
candy = {
id: 2,
type: ‘candy’
name: ‘candy’,
flavor: ‘strawberry’,
exp: ‘2023–12–01’
}
In case you will run the following command:
source_entity_df = spark.read.format(“cosmos.olap”).options(**cfg)\
.load()\
.filter(col(“type”) = ‘ice_cream’)
You will get the following result:
Data Frame:
| id | type | name | flavor |
| 1 | ice_crema | ice_cream | chocolate |
which looks absolutely right, but the schema of this Data Frame will be:
root
- id
- type
- name
- flavor
- id
- type
- name
- flavor
- exp
What?! Yes, now good luck, and usually we are not talking about 9 fields, usually we are dealing with complicated and nested structures.
Now, this aggregated schema is useless and it’s very hard to continue working with this.
I tried to save it as Delta Table; of course, Delta Table complains about duplicate columns.
Don’t even waste your time.
So this solution is good, but just in case you have one object type in each Azure Cosmos DB container with a single schema type and you are NOT planning to add more types with different schemas to this container (which, by the way, contradicts with Microsoft best practices on how to build containers in Azure Cosmos DB)
Serverless SQL Pool (this works :-) )
OK, Serverless SQL Pool — we have the option to create VIEWs that will be our perfect gateway to Azure Cosmos DB Analytical Store. It’s pretty simple and it’s supports nested JSON structure as well — we can explode it just like in PySpark, here is the example:
Let’s assume we have Cosmos DB called “quasar”, and a container called “stars” and we would like to create VIEW to get data objects with type = “red_dwarf”
So CREATE VIEW command will look like the following:
CREATE OR ALTER VIEW give_me_dwarf
SELECT type,
size,
name,
year_discovered,
elementId,
elementName
FROM OPENROWSET(PROVIDER = ‘CosmosDB’,
CONNECTION = ‘Account=quasar-dbs;Database=quasar’,
OBJECT = ‘stars_list’,
SERVER_CREDENTIAL = ‘Lquasar_dbs_creds’)
WITH (
type VARCHAR(2000),
size VARCHAR(2000) ‘$.document.type’,
name VARCHAR(2000) ‘$.document.name’,
year_discovered VARCHAR(2000) ‘$.document.year_discovered’,
chemical_composition VARCHAR(MAX) $.document.chemical_composition’
) as HTAP
cross apply openjson (chemical_composition)
with (
elementId VARCHAR(2000) ‘$.elementId’,
elementName VARCHAR(2000) ‘$.elementName’,
)
That's it, now we can use this VIEW and consume objects from Azure Cosmos DB OLAP (via Spark, C# — whatever technology you choose) with no performance complications on OLTP.
Thank You for reading, I HOPE it helps!
Comments
Post a Comment