logo image

When you’re using the cloud to store your data, you need to ensure that you can transfer your data up to the cloud both quickly and reliably. Data can be transferred to Data Bridge in a single stream upload by requesting a pre-signed upload URL and then uploading to it before calling import. A guide for this has been created here. This process is fine for relatively small databases, but when the size of uploads starts becoming large (100MB according to AWS), then you should consider using multi-part uploads for transferring data to the cloud.

RMS recommends that you use multi-part upload - for all integration projects. While this process - may be more complicated than a single upload stream, it has a host of benefits:

  • Greater throughput utilization: If you have insufficient bandwidth, you may be limited by certain parts of the network from utilizing your full network capacity in a single stream. By sending multiple streams of data up, you are able to utilize more of your network capacity.
  • Better recovery from network issues: Networks fail all the time, often a simple retry is all that is needed to get it going again, but if you’ve got a 10 GB database you’re transferring up which fails after 9GB has been transferred, you’re going to have to upload the entire 10GB again. If you use multi-part uploads, you’re able to  upload chunks individually to the cloud, meaning if any of the individual uploads fail only that individual chunk needs to be re-uploaded

This is a three-step process:

  1. Generate an upload from Databridge: /databridge/v1/sql‑instances/{instanceName}/databases/{databaseName}/{fileExtension}/init‑upload
  2. Break the file down into chunks and iterate over the following process starting at part number 1:
    1. Get a pre-signed URL for S3 to upload the part to
      /databridge/v1/sql‑instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}
    2. Upload the file chunk to S3 using a put request to the pre-signed URL as an octet-stream with the contents of the file.
  3. Once the entire file has been chunked up and uploaded, it’s time to tell Data Bridge that you’ve completed uploading the file by issuing a post request to
    /databridge/v1/sql‑instances/{instanceName}/databases/{databaseName}/{fileExtension}/complete-upload

In this tutorial, we’re omitting a couple of things that you should consider when implementing in production. For example:

* Multi-Threaded streaming uploads can increase the throughput of uploads

* Failure recovery and exception handling are used to ensure that if one chunk fails,
   it is retried so the entire process does not fail

  • C# Tutorial - This tutorial has been created using .NET core 3.1 and should work for .NET 5. The only external dependency that we’re using in this walkthrough is Newtonsoft.Json which can be added as a Nuget dependency.
  • Python Tutorial - This tutorial requires one external dependency to be installed-- the requests library which can be pip installed. This sample has also been built using Python 3.9.

Generate an Upload ID From Data Bridge

Before you get started with uploading your EDM to RMS Intelligent Risk Platform™ using multi-part upload, you must first generate an Upload ID from Data Bridge. In order to do so, you’ll need a SQL Instance ID to get the appropriate SQL Instance to upload the database to. This can be found by querying /databridge/v1/sql-instances and determining which of the returned results to send it to. 

In this case, we’re going to assume there is only one SQL Instance returned and we’ll use that. The request is as follows:

Close
Expand

Copy to

C#

      
        public static async Task Main(string[] args 
{
using(HttpClient client = new HttpClient())
{
string baseUrl = "https://api-euw1.rms.com";
string apiKey = "<<YOUR API KEY HERE>>";

client.BaseAddress = new Uri(baseUrl);

//set the auth API key
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey);

var sqlInstance = await GetFirstSqlInstance(client);
}
}

private static async Task < string > GetFirstSqlInstance(HttpClient client)
{
using(HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances"))
{
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

using(HttpResponseMessage response = await client.SendAsync(request))
{
if (response.IsSuccessStatusCode)
{
using(HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JArray.Parse(jsonString);
return jsonResponse[0]["name"].ToString();
}
}
throw new Exception("Unable to get SQL insance names. HTTP status Code:" +
response.StatusCode);
}
}
}

public static async Task Main(string[] args
{
using(HttpClient client = new HttpClient())
{
string baseUrl = "https://api-euw1.rms.com";
string apiKey = "<<YOUR API KEY HERE>>";

client.BaseAddress = new Uri(baseUrl);

//set the auth API key
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey);

var sqlInstance = await GetFirstSqlInstance(client);
}
}

private static async Task < string > GetFirstSqlInstance(HttpClient client)
{
using(HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances"))
{
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

using(HttpResponseMessage response = await client.SendAsync(request))
{
if (response.IsSuccessStatusCode)
{
using(HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JArray.Parse(jsonString);
return jsonResponse[0]["name"].ToString();
}
}
throw new Exception("Unable to get SQL insance names. HTTP status Code:" +
response.StatusCode);
}
}
}

Python

      
        import requests
import time

def get_auth_header(auth_key):
headers = {
"Authorization": auth_key
}
return headers

def get_sql_instance(base_url, headers):
url = f"{base_url}/databridge/v1/sql-instances"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()[0]["name"]

if __name__ == "__main__":
base_url = "https://api-euw1.rms.com"
api_key = "xxx"
auth_header = get_auth_header(api_key)
sql_instance = get_sql_instance(base_url, auth_header)

import requests
import time

def get_auth_header(auth_key):
headers = {
"Authorization": auth_key
}
return headers

def get_sql_instance(base_url, headers):
url = f"{base_url}/databridge/v1/sql-instances"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()[0]["name"]

if __name__ == "__main__":
base_url = "https://api-euw1.rms.com"
api_key = "xxx"
auth_header = get_auth_header(api_key)
sql_instance = get_sql_instance(base_url, auth_header)

Close
Expand

Copy to

Now that you’ve retrieved your SQL instance, you can construct the request to generate an upload ID from Data Bridge using the following path: /v1/sql‑instances/{instanceName}/databases/{databaseName}/{fileExtension}/init‑upload. 

You’ll notice a couple of other things you’re going to need to add into the URL path which are databaseName and fileExtension. The databaseName is the name you’d like to give the database once it’s been uploaded to Data Bridge. The fileextension indicates the type of file you’re sending up. The options are mdf, bak, and dacpac

For our purposes, we’re going to send up a .mdf file.

Close
Expand

Copy to

C#

      
        public static async Task Main(string[] args 
{
using(HttpClient client = new HttpClient())
{
// Previous steps omitted here
var edmName = "my_edm";
var fileExtensionLiteral = "edm";
var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName);
}
}

private static async Task < string > GenerateUploadId(HttpClient client, string sqlInstance,
string fileExtension, string databaseName)
{
using(HttpResponseMessage response =
await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null))
{
if (response.IsSuccessStatusCode)
{
using(HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JObject.Parse(jsonString);
return jsonResponse["uploadId"].ToString();
}
}
throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode);
}
}

public static async Task Main(string[] args
{
using(HttpClient client = new HttpClient())
{
// Previous steps omitted here
var edmName = "my_edm";
var fileExtensionLiteral = "edm";
var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName);
}
}

private static async Task < string > GenerateUploadId(HttpClient client, string sqlInstance,
string fileExtension, string databaseName)
{
using(HttpResponseMessage response =
await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null))
{
if (response.IsSuccessStatusCode)
{
using(HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JObject.Parse(jsonString);
return jsonResponse["uploadId"].ToString();
}
}
throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode);
}
}

Python

      
        def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload"
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()["uploadId"]

if __name__ == "__main__":
# Previous steps omitted here
database_name = "my_python_edm"
file_extension = "mdf"
upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance)

def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload"
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()["uploadId"]

if __name__ == "__main__":
# Previous steps omitted here
database_name = "my_python_edm"
file_extension = "mdf"
upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance)

Close
Expand

Copy to

Now that you have your Upload ID, you can start chunking up the file and sending the parts of the file up.

Break Down the File Into Chunks and Upload Each Chunk Individually

This is the step where the majority of the work takes place. You’re going to start by breaking the file into chunks by using a FileStream in C# and read a specified chunk of the file into the buffer before uploading this chunk.

Close
Expand

Copy to

C#

      
        public static async Task Main(string[] args
{
using (HttpClient client = new HttpClient())
{
// Previous steps omitted here
var localFilePath = "my_edm.mdf";
var bufferSizeInBytes = 20 * 1024 * 1024;
string fileExtensionLiteral = "mdf";

var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath,
fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId);
}
}

private static async Task<Dictionary<string, string>>
UploadFilesUsingMultiPartUpload(HttpClient client,
string databaseName, string localFilePath, string fileExtension,
int bufferSizeInBytes, string sqlInstanceName, string uploadId)
{
using (FileStream fileStream = File.OpenRead(localFilePath))
{
var partNumber = 1;
var etags = new Dictionary<string, string>();
while (true)
{
var buffer = new byte[bufferSizeInBytes];
// Read a chunk from the file the size of the buffer
var bytesRead = fileStream.Read(buffer, 0, buffer.Length);
if (bytesRead == 0)
{
break;
}
// Get partial upload URL
var uploadUrl = await GetPresignedPartialURL(client, databaseName,
fileExtension, sqlInstanceName, uploadId, partNumber);
// Upload chunk to URL
etags.Add(partNumber.ToString(),
await UploadByteArray(buffer, bytesRead, uploadUrl)); partNumber++;
}
return etags;
}
}

private async static Task<string> GetPresignedPartialURL(HttpClient client, string
databaseName, string fileExtension, string sqlInstanceName, string uploadId,
int partNumber)
{
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get,
$”databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}”))
{
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(“application/json”));
using (HttpResponseMessage response = await client.SendAsync(request))
{
using (HttpContent content = response.Content)
{
var url = await content.ReadAsStringAsync();
return url.Trim(‘”’);
}
}
}
}

private async static Task<string> UploadByteArray(byte[] buffer, int length,
string uploadUrl)
{
using (HttpClient client = new HttpClient())
{
using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length))
{
content.Headers.Add(“Content-Type”, “application/octet-stream”);
using (HttpResponseMessage response =
await client.PutAsync(uploadUrl, content))
{
if (response.IsSuccessStatusCode)
{
return response.Headers.Etag.Tag.Trim(‘”’);
}

throw new Exception(“Unable to upload chunk. HTTP status Code:” +
response.StatusCode);
}
}
}
}

public static async Task Main(string[] args
{
using (HttpClient client = new HttpClient())
{
// Previous steps omitted here
var localFilePath = "my_edm.mdf";
var bufferSizeInBytes = 20 * 1024 * 1024;
string fileExtensionLiteral = "mdf";

var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath,
fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId);
}
}

private static async Task<Dictionary<string, string>>
UploadFilesUsingMultiPartUpload(HttpClient client,
string databaseName, string localFilePath, string fileExtension,
int bufferSizeInBytes, string sqlInstanceName, string uploadId)
{
using (FileStream fileStream = File.OpenRead(localFilePath))
{
var partNumber = 1;
var etags = new Dictionary<string, string>();
while (true)
{
var buffer = new byte[bufferSizeInBytes];
// Read a chunk from the file the size of the buffer
var bytesRead = fileStream.Read(buffer, 0, buffer.Length);
if (bytesRead == 0)
{
break;
}
// Get partial upload URL
var uploadUrl = await GetPresignedPartialURL(client, databaseName,
fileExtension, sqlInstanceName, uploadId, partNumber);
// Upload chunk to URL
etags.Add(partNumber.ToString(),
await UploadByteArray(buffer, bytesRead, uploadUrl)); partNumber++;
}
return etags;
}
}

private async static Task<string> GetPresignedPartialURL(HttpClient client, string
databaseName, string fileExtension, string sqlInstanceName, string uploadId,
int partNumber)
{
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get,
$”databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}”))
{
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(“application/json”));
using (HttpResponseMessage response = await client.SendAsync(request))
{
using (HttpContent content = response.Content)
{
var url = await content.ReadAsStringAsync();
return url.Trim(‘”’);
}
}
}
}

private async static Task<string> UploadByteArray(byte[] buffer, int length,
string uploadUrl)
{
using (HttpClient client = new HttpClient())
{
using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length))
{
content.Headers.Add(“Content-Type”, “application/octet-stream”);
using (HttpResponseMessage response =
await client.PutAsync(uploadUrl, content))
{
if (response.IsSuccessStatusCode)
{
return response.Headers.Etag.Tag.Trim(‘”’);
}

throw new Exception(“Unable to upload chunk. HTTP status Code:” +
response.StatusCode);
}
}
}
}

Python

      
        def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
etags = {}
with open(local_file_path, ‘rb’) as db_file:
chunk = db_file.read(chunk_size_bytes)
part_number = 1
while chunk:
url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number)
etag = upload_chunk_to_url(url, chunk)
etags[str(part_number)] = etag
chunk = db_file.read(chunk_size_bytes)
part_number = part_number + 1


def upload_chunk_to_url(url, chunk):
headers = {
“Content-Type”: “application/octet-stream”
}
response = requests.put(url, headers=headers, data=chunk)
response.raise_for_status()
return response.headers[“Etag”].strip(‘\”’)


def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number):
url = f”{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}”
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text

if __name__ == “__main__”:
# Previous steps omitted here
chunk_size = 20 * 1024 * 1024
local_path = “my_python_edm.mdf”
upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id)

def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
etags = {}
with open(local_file_path, ‘rb’) as db_file:
chunk = db_file.read(chunk_size_bytes)
part_number = 1
while chunk:
url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number)
etag = upload_chunk_to_url(url, chunk)
etags[str(part_number)] = etag
chunk = db_file.read(chunk_size_bytes)
part_number = part_number + 1


def upload_chunk_to_url(url, chunk):
headers = {
“Content-Type”: “application/octet-stream”
}
response = requests.put(url, headers=headers, data=chunk)
response.raise_for_status()
return response.headers[“Etag”].strip(‘\”’)


def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number):
url = f”{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}”
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text

if __name__ == “__main__”:
# Previous steps omitted here
chunk_size = 20 * 1024 * 1024
local_path = “my_python_edm.mdf”
upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id)

Close
Expand

Copy to

In the above sample, it’s worth pointing out a couple of things. The first is that you’re iterating the code over blocks of the file. If the file is 100MB in size and you’re breaking it into 20MB chunks, you should expect this loop to run 5 times. Don’t forget to include a break clause in the loop if you find you’ve completed the file, otherwise, you’re going to run into an infinite loop.

You read a section of the file into a buffer byte array, then get a pre-signed URL from the Data Bridge API to find where to PUT the chunk of the file.

Once the URL has been retrieved, then upload a byte array. Note that you’re using an octet stream type of content. This is to transmit binary content, which is what the byte arrays are.

Once you’ve retrieved the URL to upload to, you’re going to upload a byte array to it. Note that you’re using an octet stream type of content - this is to transmit binary content which is what your byte arrays are.

The last thing to note is the Dictionary called etags you’re caching responses into. You’re looking to collect etags from each of your parts to pass them up to the next phase of the workflow - issuing a complete statement.

Issue Upload Complete Statement

You’re now onto the last step in the multi-part upload, issuing a complete upload request.

Close
Expand

Copy to

C#

      
        public static async Task Main(string[] args
{
using (HttpClient client = new HttpClient())
{
// Previous steps omitted here
await CompleteUpload(client, databaseName, fileExtension, sqlInstanceName,
uploadId, etags);
await AttachDatabase(client, sqlInstance, edmName, /*MDF = 0, BAK = 1*/0);
}
}
private static async Task CompleteUpload(HttpClient client, string databaseName,
string fileExtension, string sqlInstanceName, string uploadId,
Dictionary<string, string> etags)
{
using (HttpContent payload = new StringContent(
string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId,
JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json"))
{
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload))
{
if (!response.IsSuccessStatusCode)
{
throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode);
}
}
}
}
private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, int fileFormat)
{
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileFormat}", null))
{
if (response.IsSuccessStatusCode)
{
using (HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JObject.Parse(jsonString);
string jobId = jsonResponse["jobId"].ToString();
// poll until job is complete
await PollJobStatus(client, jobId);
}
}
}
}

private static async Task PollJobStatus(HttpClient client, string jobId,
int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000)
{
string status;
int totalWaitTime = 0;
while (totalWaitTime < maxWaitTimeInMilliseonds)
{
// Query Job API
using (HttpResponseMessage response =
await client.GetAsync($"databridge/v1/Jobs/{jobId}"))
{
using (HttpContent content = response.Content)
{
status = await content.ReadAsStringAsync();
if (status == "Succeeded")
{
break;
}
Thread.Sleep(sleepIntervalInMilliseconds);
totalWaitTime += sleepIntervalInMilliseconds;
}
}
}
}


public static async Task Main(string[] args
{
using (HttpClient client = new HttpClient())
{
// Previous steps omitted here
await CompleteUpload(client, databaseName, fileExtension, sqlInstanceName,
uploadId, etags);
await AttachDatabase(client, sqlInstance, edmName, /*MDF = 0, BAK = 1*/0);
}
}
private static async Task CompleteUpload(HttpClient client, string databaseName,
string fileExtension, string sqlInstanceName, string uploadId,
Dictionary<string, string> etags)
{
using (HttpContent payload = new StringContent(
string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId,
JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json"))
{
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload))
{
if (!response.IsSuccessStatusCode)
{
throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode);
}
}
}
}
private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, int fileFormat)
{
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileFormat}", null))
{
if (response.IsSuccessStatusCode)
{
using (HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JObject.Parse(jsonString);
string jobId = jsonResponse["jobId"].ToString();
// poll until job is complete
await PollJobStatus(client, jobId);
}
}
}
}

private static async Task PollJobStatus(HttpClient client, string jobId,
int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000)
{
string status;
int totalWaitTime = 0;
while (totalWaitTime < maxWaitTimeInMilliseonds)
{
// Query Job API
using (HttpResponseMessage response =
await client.GetAsync($"databridge/v1/Jobs/{jobId}"))
{
using (HttpContent content = response.Content)
{
status = await content.ReadAsStringAsync();
if (status == "Succeeded")
{
break;
}
Thread.Sleep(sleepIntervalInMilliseconds);
totalWaitTime += sleepIntervalInMilliseconds;
}
}
}
}


Python

      
        def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
# Previous steps omitted
complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload"
payload = {
"uploadId": upload_id,
"etags": etags
}
response = requests.post(complete_url, headers=headers, json=payload)
response.raise_for_status()
def attach_database(base_url,headers, sql_instance, database_name, fileType):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}"
response = requests.post(url, headers=headers)
response.raise_for_status()
jobId = response.json()["jobId"]
status = 'InProgress'
totalWaitTime = 0
while(totalWaitTime < 300):
response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers)
status = response.text
if status != 'Succeeded':
time.sleep(5)
totalWaitTime += 5
else:
break


def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
# Previous steps omitted
complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload"
payload = {
"uploadId": upload_id,
"etags": etags
}
response = requests.post(complete_url, headers=headers, json=payload)
response.raise_for_status()
def attach_database(base_url,headers, sql_instance, database_name, fileType):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}"
response = requests.post(url, headers=headers)
response.raise_for_status()
jobId = response.json()["jobId"]
status = 'InProgress'
totalWaitTime = 0
while(totalWaitTime < 300):
response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers)
status = response.text
if status != 'Succeeded':
time.sleep(5)
totalWaitTime += 5
else:
break


Close
Expand

Copy to

You’ve seen several requests up to the DataBridge API at this stage, so there isn’t much new here besides the inclusion of the etags payload. You’re adding the uploadId and etags that were retrieved from each of the upload chunks and sending them as part of the payload to DataBridge.

The full sample code to achieve this multi-part upload in C# is below.

## Complete C# Sample Code

Close
Expand

Copy to

C#

      
        using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Threading.Tasks;
using System.Net.Http.Headers;
using System.Globalization;
using System.Threading;
using System.Text;

namespace UploadSample
{
public class MultiPartUploadProgram
{

public enum FileType
{
MDF = 0,
BAK = 1,
DACPAC = 2
}

public static async Task Main(string[] args)
{
using (HttpClient client = new HttpClient())
{
FileType fileExtension;
string baseUrl = "https://api-euw1.rms.com";
string apiKey = args[0];
var edmName = args[1];
var localFilePath = args[2];

// overrride baseURL if defined
if (args.Length > 3)
{
baseUrl = args[3];
}
client.BaseAddress = new Uri(baseUrl);

var bufferSizeInBytes = 20 * 1024 * 1024;

if (localFilePath.EndsWith("mdf", true, CultureInfo.InvariantCulture))
{
fileExtension= FileType.MDF;
}
else if (localFilePath.EndsWith("bak", true, CultureInfo.InvariantCulture))
{
fileExtension= FileType.BAK;
}
else
{
Console.WriteLine("Invalid File extension. Supported extensions are .mdf and .bak ");
return;
}

string fileExtensionLiteral = Enum.GetName(typeof(FileType), fileExtension).ToLowerInvariant();

//set the auth API key
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey);

var sqlInstance = await GetSqlInstance(client);
var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName);
var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath, fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId);
await CompleteUpload(client, edmName, fileExtensionLiteral, sqlInstance, uploadId, etags);
await AttachDatabase(client, sqlInstance, edmName, fileExtension);

}
}

private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, FileType fileFormat)
{
int fileExtensionValue = (int)fileFormat;
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileExtensionValue}", null))
{
if (response.IsSuccessStatusCode)
{
using (HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JObject.Parse(jsonString);
string jobId = jsonResponse["jobId"].ToString();
// poll until job is complete
await PollJobStatus(client, jobId);
}
}
}
}

private static async Task PollJobStatus(HttpClient client, string jobId, int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000)
{
string status;
int totalWaitTime = 0;
while (totalWaitTime < maxWaitTimeInMilliseonds)
{
// Query Job API
using (HttpResponseMessage response = await client.GetAsync($"databridge/v1/Jobs/{jobId}"))
{
using (HttpContent content = response.Content)
{
status = await content.ReadAsStringAsync();
if (status == "Succeeded")
{
break;
}
Thread.Sleep(sleepIntervalInMilliseconds);
totalWaitTime += sleepIntervalInMilliseconds;
}
}
}
}

private static async Task<Dictionary<string, string>> UploadFilesUsingMultiPartUpload(HttpClient client, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId)
{
using (FileStream fileStream = File.OpenRead(localFilePath))
{
var partNumber = 1;
var etags = new Dictionary<string, string>();

while (true)
{
var buffer = new byte[bufferSizeInBytes];
var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer
if (bytesRead == 0)
{
break;
}
// Get partial upload URL
var uploadUrl = await GetPresignedPartialURL(client, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber);

// Upload chunk to URL
etags.Add(partNumber.ToString(), await UploadByteArray(buffer, bytesRead, uploadUrl));
partNumber++;
}
return etags;

}
}

private async static Task<string> GetPresignedPartialURL(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber)
{
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, $"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}"))
{
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
using (HttpResponseMessage response = await client.SendAsync(request))
{
using (HttpContent content = response.Content)
{
var url = await content.ReadAsStringAsync();
return url.Trim('"');
}
}
}
}

private async static Task<string> UploadByteArray(byte[] buffer, int length, string uploadUrl)
{
using (HttpClient client = new HttpClient())
{
using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length))
{
content.Headers.Add("Content-Type", "application/octet-stream");
using (HttpResponseMessage response = await client.PutAsync(uploadUrl, content))
{
if (response.IsSuccessStatusCode)
{
return response.Headers.ETag.Tag.Trim('"');
}

throw new Exception("Unable to upload chunk. HTTP status Code:" + response.StatusCode);
}

}
}
}

private static async Task CompleteUpload(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags)
{
using (HttpContent payload = new StringContent(string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json"))
{
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload))
{
if (!response.IsSuccessStatusCode)
{
throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode);
}
}
}
}

private static async Task<string> GenerateUploadId(HttpClient client, string sqlInstance, string fileExtension, string databaseName)
{
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null))
{
if (response.IsSuccessStatusCode)
{
using (HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JObject.Parse(jsonString);
return jsonResponse["uploadId"].ToString();
}
}
throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode);
}
}


private static async Task<string> GetSqlInstance(HttpClient client)
{
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances"))
{
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
using (HttpResponseMessage response = await client.SendAsync(request))
{
if (response.IsSuccessStatusCode)
{
using (HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JArray.Parse(jsonString);
return jsonResponse[0]["name"].ToString();
}
}
throw new Exception("Unable to get SQL insance names. HTTP status Code:" + response.StatusCode);
}
}
}
}
}

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Threading.Tasks;
using System.Net.Http.Headers;
using System.Globalization;
using System.Threading;
using System.Text;

namespace UploadSample
{
public class MultiPartUploadProgram
{

public enum FileType
{
MDF = 0,
BAK = 1,
DACPAC = 2
}

public static async Task Main(string[] args)
{
using (HttpClient client = new HttpClient())
{
FileType fileExtension;
string baseUrl = "https://api-euw1.rms.com";
string apiKey = args[0];
var edmName = args[1];
var localFilePath = args[2];

// overrride baseURL if defined
if (args.Length > 3)
{
baseUrl = args[3];
}
client.BaseAddress = new Uri(baseUrl);

var bufferSizeInBytes = 20 * 1024 * 1024;

if (localFilePath.EndsWith("mdf", true, CultureInfo.InvariantCulture))
{
fileExtension= FileType.MDF;
}
else if (localFilePath.EndsWith("bak", true, CultureInfo.InvariantCulture))
{
fileExtension= FileType.BAK;
}
else
{
Console.WriteLine("Invalid File extension. Supported extensions are .mdf and .bak ");
return;
}

string fileExtensionLiteral = Enum.GetName(typeof(FileType), fileExtension).ToLowerInvariant();

//set the auth API key
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey);

var sqlInstance = await GetSqlInstance(client);
var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName);
var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath, fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId);
await CompleteUpload(client, edmName, fileExtensionLiteral, sqlInstance, uploadId, etags);
await AttachDatabase(client, sqlInstance, edmName, fileExtension);

}
}

private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, FileType fileFormat)
{
int fileExtensionValue = (int)fileFormat;
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileExtensionValue}", null))
{
if (response.IsSuccessStatusCode)
{
using (HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JObject.Parse(jsonString);
string jobId = jsonResponse["jobId"].ToString();
// poll until job is complete
await PollJobStatus(client, jobId);
}
}
}
}

private static async Task PollJobStatus(HttpClient client, string jobId, int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000)
{
string status;
int totalWaitTime = 0;
while (totalWaitTime < maxWaitTimeInMilliseonds)
{
// Query Job API
using (HttpResponseMessage response = await client.GetAsync($"databridge/v1/Jobs/{jobId}"))
{
using (HttpContent content = response.Content)
{
status = await content.ReadAsStringAsync();
if (status == "Succeeded")
{
break;
}
Thread.Sleep(sleepIntervalInMilliseconds);
totalWaitTime += sleepIntervalInMilliseconds;
}
}
}
}

private static async Task<Dictionary<string, string>> UploadFilesUsingMultiPartUpload(HttpClient client, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId)
{
using (FileStream fileStream = File.OpenRead(localFilePath))
{
var partNumber = 1;
var etags = new Dictionary<string, string>();

while (true)
{
var buffer = new byte[bufferSizeInBytes];
var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer
if (bytesRead == 0)
{
break;
}
// Get partial upload URL
var uploadUrl = await GetPresignedPartialURL(client, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber);

// Upload chunk to URL
etags.Add(partNumber.ToString(), await UploadByteArray(buffer, bytesRead, uploadUrl));
partNumber++;
}
return etags;

}
}

private async static Task<string> GetPresignedPartialURL(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber)
{
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, $"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}"))
{
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
using (HttpResponseMessage response = await client.SendAsync(request))
{
using (HttpContent content = response.Content)
{
var url = await content.ReadAsStringAsync();
return url.Trim('"');
}
}
}
}

private async static Task<string> UploadByteArray(byte[] buffer, int length, string uploadUrl)
{
using (HttpClient client = new HttpClient())
{
using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length))
{
content.Headers.Add("Content-Type", "application/octet-stream");
using (HttpResponseMessage response = await client.PutAsync(uploadUrl, content))
{
if (response.IsSuccessStatusCode)
{
return response.Headers.ETag.Tag.Trim('"');
}

throw new Exception("Unable to upload chunk. HTTP status Code:" + response.StatusCode);
}

}
}
}

private static async Task CompleteUpload(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags)
{
using (HttpContent payload = new StringContent(string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json"))
{
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload))
{
if (!response.IsSuccessStatusCode)
{
throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode);
}
}
}
}

private static async Task<string> GenerateUploadId(HttpClient client, string sqlInstance, string fileExtension, string databaseName)
{
using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null))
{
if (response.IsSuccessStatusCode)
{
using (HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JObject.Parse(jsonString);
return jsonResponse["uploadId"].ToString();
}
}
throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode);
}
}


private static async Task<string> GetSqlInstance(HttpClient client)
{
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances"))
{
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
using (HttpResponseMessage response = await client.SendAsync(request))
{
if (response.IsSuccessStatusCode)
{
using (HttpContent content = response.Content)
{
var jsonString = await content.ReadAsStringAsync();
var jsonResponse = JArray.Parse(jsonString);
return jsonResponse[0]["name"].ToString();
}
}
throw new Exception("Unable to get SQL insance names. HTTP status Code:" + response.StatusCode);
}
}
}
}
}

Close
Expand

Copy to

The full sample code to achieve this multi-part upload in Python is below.

## Complete Python Sample Code

Close
Expand

Copy to

Python

      
        import requests
import time

def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
etags = {}
with open(local_file_path, 'rb') as db_file:
chunk = db_file.read(chunk_size_bytes)
part_number = 1
while chunk:
url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number)
etag = upload_chunk_to_url(url, chunk)
etags[str(part_number)] = etag
chunk = db_file.read(chunk_size_bytes)
part_number = part_number + 1
complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload"
payload = {
"uploadId": upload_id,
"etags": etags
}
response = requests.post(complete_url, headers=headers, json=payload)
response.raise_for_status()


def upload_chunk_to_url(url, chunk):
headers = {
"Content-Type": "application/octet-stream"
}
response = requests.put(url, headers=headers, data=chunk)
response.raise_for_status()
return response.headers["Etag"].strip('\"')


def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text


def get_sql_instance(base_url, headers):
url = f"{base_url}/databridge/v1/sql-instances"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()[0]["name"]


def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload"
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()["uploadId"]

def attach_database(base_url,headers, sql_instance, database_name, fileType):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}"
response = requests.post(url, headers=headers)
response.raise_for_status()
jobId = response.json()["jobId"]
status = 'InProgress'
totalWaitTime = 0
while(totalWaitTime < 300):
response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers)
status = response.text
if status != 'Succeeded':
time.sleep(5)
totalWaitTime += 5
else:
break

def get_auth_header(auth_key):
headers = {
"Authorization": auth_key
}
return headers

if __name__ == "__main__":
base_url = "https://api-euw1.rms.com"
api_key = "***"
database_name = "my_python_edm"
local_path = "my_python_edm.mdf"
file_extension = "mdf"
chunk_size = 20 * 1024 * 1024
auth_header = get_auth_header(api_key)
sql_instance = get_sql_instance(base_url, auth_header)
upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance)
upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id)
attach_database(base_url, auth_header, sql_instance, database_name, 0)

import requests
import time

def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id):
etags = {}
with open(local_file_path, 'rb') as db_file:
chunk = db_file.read(chunk_size_bytes)
part_number = 1
while chunk:
url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number)
etag = upload_chunk_to_url(url, chunk)
etags[str(part_number)] = etag
chunk = db_file.read(chunk_size_bytes)
part_number = part_number + 1
complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload"
payload = {
"uploadId": upload_id,
"etags": etags
}
response = requests.post(complete_url, headers=headers, json=payload)
response.raise_for_status()


def upload_chunk_to_url(url, chunk):
headers = {
"Content-Type": "application/octet-stream"
}
response = requests.put(url, headers=headers, data=chunk)
response.raise_for_status()
return response.headers["Etag"].strip('\"')


def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text


def get_sql_instance(base_url, headers):
url = f"{base_url}/databridge/v1/sql-instances"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()[0]["name"]


def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload"
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()["uploadId"]

def attach_database(base_url,headers, sql_instance, database_name, fileType):
url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}"
response = requests.post(url, headers=headers)
response.raise_for_status()
jobId = response.json()["jobId"]
status = 'InProgress'
totalWaitTime = 0
while(totalWaitTime < 300):
response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers)
status = response.text
if status != 'Succeeded':
time.sleep(5)
totalWaitTime += 5
else:
break

def get_auth_header(auth_key):
headers = {
"Authorization": auth_key
}
return headers

if __name__ == "__main__":
base_url = "https://api-euw1.rms.com"
api_key = "***"
database_name = "my_python_edm"
local_path = "my_python_edm.mdf"
file_extension = "mdf"
chunk_size = 20 * 1024 * 1024
auth_header = get_auth_header(api_key)
sql_instance = get_sql_instance(base_url, auth_header)
upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance)
upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id)
attach_database(base_url, auth_header, sql_instance, database_name, 0)

Close
Expand

Copy to

Anish Patel
Anish Patel
Head of Technology Consulting, RMS

Anish has spent his career delivering technical solutions to end users in pursuit of workflow efficiencies and business differentiating abilities.

He is currently the Head of Technology Consulting at RMS, the world’s leading catastrophe risk modelling company. Anish’s responsibilities include overseeing the Technology Consulting team across all aspects of the technical project lifecycle from solution discovery through implementation.

Prior to his time at RMS, Anish was designing and delivering fixed income and structured product trading systems at Goldman Sachs and Commerzbank. During his time in Banking, he was a Software Engineer who sat directly with Traders and Structurers which allowed him to gain a deep level of understanding of the business processes.

Anish holds a first class BSc in Economics and Finance from the Brunel University and a PGCert in Mathematical Finance from the University of York. He also holds several professional software development accreditations including Java Professional Programmer and is a certified Scrum Master.

Follow Anish on Twitter @codeanish and on GitHub https://github.com/codeanish

Tech Banner

Questions or Feedback on Current Blog Posts? Suggestions for Upcoming Blog Posts?

close button
Overlay Image
Video Title

Thank You

You’ll be contacted by an Moody's RMS specialist shortly.