logo image
More Topics

Reset Filters

link
December 31, 2022
What's New on the RMS Intelligent Risk Platform?

Moody’s RMS continues to advance the RMS Intelligent Risk Platform™ (IRP), the open, modular, and cloud-native digital foundation for RMS models, data, applications, and unified risk analytics. As a gateway to robust risk analytics and in-depth risk data, the IRP brings together a trusted data repository, collaborative applications, and open application programming interfaces (APIs) to help deliver better business outcomes. We released several updates to both the RMS Risk Modeler™ and RMS ExposureIQ™ applications in December. In this blog, we will demonstrate our commitment to our clients through continued investment in the latest science, technology, and data, with an update of both the new solution features and our improvements to existing capabilities. A major focus for our latest Risk Modeler and ExposureIQ release is to prepare for the introduction of our new RMS® Terrorism HD Model. This probabilistic terrorism model for property exposures will be available to clients in January. In addition to this new release, we have made several big improvements to existing capabilities, including some of the additions below: Improvements to the Results Data Module (RDM) Schema for HD Modeled Losses (Risk Modeler): As the RDM schema continues to be important for clients transferring modeled losses, the new update allows HD modeled losses to be easily included within RDMs. Updated 2023 Workers' Compensation Cost Severities (Risk Modeler): RMS has updated the U.S. workers’ compensation cost severity data for the calendar year 2023, to reflect the following changes in U.S. workers’ compensation: Incorporation of the latest state-level regulatory changes, wage levels, and benefit provisions per the U.S. National Council of Compensation Insurance (NCCI). Introduction of the latest demographic data from sources including the U.S. Census, the Centers for Disease Control and Prevention (CDC), and the U.S. Bureau of Labor Statistics. Incorporation of the latest NCCI medical claim cost data and state medical cost relativities. Export of Premium Hazard (Risk Modeler): With the November Risk Modeler release, users could return premium hazard data layers for our U.S. risk models together with selected European models including flood and earthquake. In this latest December release, users can now export that data to an Exposure Data Module (EDM) to understand, for example, the distribution of locations by hurricane risk score, establish how much exposure is at risk of flooding above a certain severity level, or compare the hazard profile of different portfolios. Simpler ‘What-if’ Analyses with Business-Wide Views (ExposureIQ): For ExposureIQ users enrolled in the Business-Wide Views preview program, RMS has added new functionality enabling users to create hierarchies by simply copying an existing hierarchy and editing it as needed. This approach allows users to quickly recreate a complex structure – helpful when several similar hierarchies are required to run 'what-if' accumulations. Finer Detail on Location Information for Exposures in Portfolios (ExposureIQ): This update helps simplify access to finer-grain location details for CRESTA, Admin3, and city data in ExposureIQ. You can find more details of these and all other improvements for Risk Modeler and ExposureIQ in our documentation available on the RMS client support portal - RMS OWL. Introducing the RMS Terrorism HD Model We expect to launch an important update to Risk Modeler during January, with the introduction of the new RMS Terrorism HD Model using the HD (high-definition) model framework to deliver probabilistic terrorism modeling for property exposures. For some twenty years, RMS has delivered the leading terrorism risk model to the market, and we’re excited to add this model to the RMS Intelligent Risk Platform alongside our other peril models. The RMS Terrorism HD Model uses Risk Modeler to provide terrorism modeling for property exposure on the Intelligent Risk Platform. The HD model execution framework also provides an option for probabilistic terrorism modeling, with countries supported by the Probabilistic Terrorism Model (PTM) including Belgium, Canada, Denmark, France, Ireland, Italy, Turkey, the United Kingdom, and the United States. Using the RMS HD framework allows the proprietary model methodology to consider a wide range of conventional and chemical, biological, radiological, and nuclear (CBRN) macro-scale terrorism events and increase modeling transparency. Such events include individual attacks as well as more complex swarm or cluster attacks across thousands of potential targets. Join our IQ Application Preview Programs Are you interested in providing feedback on some of our new RMS underwriting solutions? We have launched a preview program to test RMS UnderwriteIQ™, RMS TreatyIQ™, and Business Hierarchies in RMS ExposureIQ. Each application draws on more than 30 years of risk analytics leadership, to take advantage of our industry-leading models, data products, financial modeling, and a unified data store, to deliver impactful analytics across the insurance lifecycle, from primary underwriting to portfolio and cedant management. Join the program and you’ll have the opportunity to test and influence the roadmap for the following solutions: UnderwriteIQ: Built for insurance underwriters looking to integrate RMS’s deep science and modeling into their underwriting and risk pricing strategy, UnderwriteIQ delivers flexible, powerful account-level modeling capabilities with low latencies. A cloud-native and collaborative underwriting analytics solution, it is fully integrated with Risk Modeler to help you understand the marginal impact on your portfolio and other Intelligent Risk Platform applications. TreatyIQ: Built for reinsurance underwriters looking for deep treaty program pricing and analytics, TreatyIQ comes with advanced financial modeling and flexible pricing formulas to give underwriters actionable foresight of risk, by analyzing the marginal impact, combined ratio, and return on capital for every quote. TreatyIQ’s game-changing portfolio roll-up performance for monitoring risk positions, analyzing risk drivers, and assessing pricing performance, speeds up workflows, reduces costs, and improves technical pricing and profitability. ExposureIQ Business Hierarchies: ExposureIQ today provides powerful accumulation analytics and real-time event response capabilities. With the addition of Business Hierarchies, exposure managers can get a complete view of exposures across multiple books in a rolled-up view, and can quickly identify organization-wide exposures across both insurance and reinsurance entities, to improve portfolio management decisions.  Backed by the sophisticated RMS financial model, customers will soon be able to apply complex inward and outward reinsurance structures for a true view of net reinsurance right up to the group level. Over the next couple of weeks, customers will have the opportunity to preview these applications for feedback. If you are interested, please contact info@rms.com.   RMS Intelligent Risk PlatformFor additional information on Risk Modeler, ExposureIQ, or any of the IQ applications on the IRP, please visit the Intelligent Risk Platform webpages. You can also access release notes on RMS OWL – the RMS client support portal.  

Staff Product Management
link
November 30, 2022
What’s New on the RMS Intelligent Risk Platform?

RMS® is constantly adding new capabilities to the RMS Intelligent Risk Platform™ (IRP), our open, modular, and cloud-native digital foundation for RMS models and unified risk analytics. As a gateway to robust risk analytics and in-depth risk data, the IRP brings together a trusted data repository, collaborative applications, and open Application Programming Interfaces (APIs) to help deliver better business outcomes. Also, the range of powerful, cloud-native applications hosted on the IRP allows multiple users to gain insights into potential hazards, exposures, and accumulations, and during November, RMS released an update to the RMS Risk Modeler™ application, focused on giving clients access to the latest technologies and the highest quality data. In this blog, we demonstrate our commitment to investing in the latest science, technology, and data, to provide an overview of both the new features and improvements to existing capabilities. Latest Release for Risk Modeler In our latest Risk Modeler release, we have introduced new premium hazard data layers along with several new updates to support these data layers, including geocoding and hazard retrieval, enhancements to the Exposure Data Module (EDM), and changes to our APIs.   For Risk Modeler, in addition to these new features, we have also made several big improvements to existing capabilities, including: Expanded export capabilities for HD Modeling with metadata treaty results Workflow enhancements for marginal impact analysis that improve data governance Better search experience for databases and analysis results New Premium Hazard Data Layers for Risk Modeler With data so critical to effective risk selection and pricing decisions, November’s Risk Modeler update includes new premium hazard data layers that are seamlessly incorporated into the geocoding and hazard retrieval workflow. This allows Risk Modeler clients to run all licensed premium hazard services alongside running geocoding and hazard retrieval. At present, these data layers are available as a preview for all users. These layers will be turned off at the end of the preview for clients that have not licensed the data layers. The benefits of these premium hazard data layers include enhanced risk management practices through the monitoring of portfolio risk distribution across various cat perils, using effective, high-quality, and consistent data to help inform underwriting and risk selection decisions. The November update to Risk Modeler incorporates premium hazard data layers for our U.S. risk models and selected European models like flood, and earthquake, with additional premium hazard layers made available in future releases. The following new premium hazard data layers are now available: Risk Scores: Risk scores are based on the mean damage ratio or average annual loss results from big data catastrophe simulations, considering the implications of different primary characteristics. Available U.S. perils include earthquake, windstorm, wildfire, flood, severe convective storm, winter storm, and terrorism. U.S. Earthquake Layers: Hazard layers include the distance from a site's location to the closest five faults in the U.S., Modified Mercalli Index (MMI) scores that represent the local shaking intensity and related effects of an earthquake, and the gradual settling or sudden sinking of the earth's surface. U.S. Windstorm Layers: New hazard layers comprise coastal areas eligible for participation in state-sponsored insurance pools in nine hurricane-exposed U.S. states, and a "distance-to-water" metric for the U.S., including rivers that flow inland and minor tributaries. North America Wildfire Layers: RMS North America Wildfire HD model licensees can now supplement their hazard layers with distance to wildland-urban interface (WUI), historical fire activity, stochastic burn probability, and urban conflagration data. For firms not licensing the models, new wildfire hazard data layers include defensible space characterized by the distance to vegetation parameter, slope data, and fuel type. Improvements to Risk Modeler Capabilities Expanded PLT Export Capabilities for HD Modeling with Metadata Treaty Results In this Risk Modeler release, we continue to enhance the export of Period Loss Tables (PLTs) to the Results Data Module (RDM) databases by adding treaty metadata to the results. The new treaty metadata provides additional context to help you understand which losses correspond to specific treaties. Workflow Enhancements for Marginal Impact Analysis to Help Improve Data Governance Using marginal impact analysis can help (re)insurers quantify the impact of writing additional layers to a reference portfolio, and in November’s Risk Modeler release, the marginal impact analysis workflow has been enhanced to enable administrators to identify reference portfolios for underwriters. For organizations with dozens or even hundreds of underwriters, this helps ensure that marginal analysis decisions are based on a common set of analytics and that users running marginal impact analyses are all working off the same reference portfolio. Improved Search Experience for Databases and Analysis Results To make it easier to find analysis results, the latest Risk Modeler update enhances the Data Explorer search feature (see figure below). When a keyword is entered into the search, Data Explorer finds all the results and databases that include the keyword, making it simpler to search across your key datasets. Figure 1: Risk Modeler Data Explorer search featureIQ Application Preview Programs Interested in previewing some of the new RMS underwriting solutions? We have launched a preview program to test RMS UnderwriteIQ™, RMS TreatyIQ™, and Business Hierarchies in RMS ExposureIQ™. Each application draws on more than 30 years of risk analytics leadership, to take advantage of our industry-leading models, data products, financial modeling, and a unified data store, to deliver impactful analytics across the insurance lifecycle, from primary underwriting to portfolio and cedant management. You’ll have the opportunity to test and influence the roadmap for the following: UnderwriteIQ: Built for insurance underwriters looking to integrate RMS’s deep science and modeling into their underwriting and risk pricing strategy, UnderwriteIQ delivers flexible, powerful account-level modeling capabilities with low latencies. A cloud-native and collaborative underwriting analytics solution, it is fully integrated with Risk Modeler to help you understand the marginal impact on your portfolio and other Intelligent Risk Platform applications. TreatyIQ: Built for reinsurance underwriters looking for deep treaty program pricing and analytics, TreatyIQ comes with advanced financial modeling and flexible pricing formulas to give underwriters actionable foresight of risk, by analyzing the marginal impact, combined ratio, and return on capital for every quote. TreatyIQ’s game-changing portfolio roll-up performance for monitoring risk positions, analyzing risk drivers, and assessing pricing performance, speeds up workflows, reduces costs, and improves technical pricing and profitability. ExposureIQ Business Hierarchies: ExposureIQ today provides powerful accumulation analytics and real-time event response capabilities. With the addition of Business Hierarchies, exposure managers can get a complete view of exposures across multiple books in a rolled-up view, and can quickly identify organization-wide exposures across both insurance and reinsurance entities, to improve portfolio management decisions.  Backed by the sophisticated RMS financial model, customers will soon be able to apply complex inward and outward reinsurance structures for a true view of net reinsurance right up to the group level. Over the next couple of weeks, customers will have the opportunity to preview these applications for feedback. If you are interested, please contact info@rms.com.   Figure 2: RMS Intelligent Risk PlatformFor additional information on Risk Modeler, ExposureIQ, or any of the IQ applications on the IRP, please visit the Intelligent Risk Platform webpages. You can also access release notes on RMS OWL – the RMS client support portal.

Staff Product Management
link
September 30, 2022
What’s New on the RMS Intelligent Risk Platform?

RMS is constantly adding new capabilities to the RMS Intelligent Risk Platform™ (IRP), our open, modular, cloud-native digital foundation for RMS models and unified risk analytics. As the gateway to robust risk analytics and in-depth data, the IRP brings together a trusted data repository, collaborative applications, and open APIs to help deliver better business outcomes.  Our powerful, cloud-native applications hosted on the IRP are designed to allow multiple users to gain insights into potential hazards, exposures, and accumulations. During September, RMS released multiple updates for both its ExposureIQ™ and Risk Modeler™ applications, to give clients access to the latest technologies and innovations. This update will help show how RMS is continuing to invest in the latest science, technology, and data by providing an in-depth look at the new features and a brief overview of our improvements to existing capabilities.   In the IRP September releases, we added new features to Risk Modeler, our cloud-native catastrophe modeling application that unifies all our client’s risk modeling needs for greater workload and cost efficiency:  Marginal Impact Analyses: Quantify the impact of writing additional layers to a portfolio. (New Feature)  Climate Change Model Analyses for RMS U.S. Inland Flood HD Model: Expanded model profile options that apply the climate change model to a broader range of existing analyses. (Improvement).  In addition, we also made several big improvements to existing capabilities of Risk Modeler and ExposureIQ, including:  Results Data Module (RDM) Schema Updates: Transact with High Definition (HD) model results more with greater ease (Risk Modeler)  New Risk Modeler APIs: New and updated services are now available via APIs (Risk Modeler)  More Geocoding Resolutions for Geopolitical and Geopolitical Spider Accumulations (ExposureIQ) NEW FEATURES Risk Modeler: Marginal Impact Analyses An individual policy may appear like a good risk based on your underwriting guidelines. But you may think twice about insuring it if you see how it contributes to the overall portfolio risk. In the September update, Risk Modeler added the ability to run a marginal impact analysis to quantify the impact of writing additional layers to your reference portfolio. Marginal impact analysis will be available for the following financial perspectives: Client Loss  Facultative Reinsurance Loss  Gross Loss  Ground Up Loss  Net Loss Pre-Cat  Quota Share Treaty Loss  Reinsurance Gross Loss  Reinsurance Net Loss  Surplus Share Treaty Loss  Working Excess  BENEFITS Marginal impact analysis allows users to better understand the risk drivers within a portfolio, to improve profitability and deliver effective capital allocation.   Risk Modeler: Climate Change Model Analyses for U.S. Inland Flood HD Model RMS now has the largest portfolio of climate change catastrophe risk models, which includes North Atlantic Hurricane, North America Wildfire, U.S. Inland Flood, Japan Typhoon and Flood, Europe Inland Flood, and Europe Windstorm perils. In the September update, RMS expanded the capabilities to run climate change model analyses on U.S. Inland Flood HD Model results. Users can now apply climate change model analysis to a broader range of existing analyses and ensure that the application of climate change is consistent with the assumptions used in the development of the event mapping files. This update expands the High Definition (HD) profile settings to support additional simulation sets, 800,000 simulation periods, and number of samples greater than one.  BENEFITS Build a more consistent view of a book of business by aligning climate change model parameters with a reference view of flood and hurricane risk. UPDATES Risk Modeler: Results Data Module (RDM) schema Updates to making it easier to Transact with HD Model Results The latest Risk Modeler release updates the HD model RDM schema so that users can store and share results more easily at different granularities with their internal and external stakeholders. BENEFITS The new RDM schema facilitates sharing HD results with other parties, helps consolidate storage for HD and DLM/ALM loss results, and simplifies query loss results using SQL scripts. Risk Modeler: New and Updated APIs Risk Modeler currently supports 380 API operations. The September 2022 release updates RDM export and earthquake hazard lookup and adds a new service for marginal impact analyses APIs. For details, see the changelog in OWL. BENEFITS Automate key workflows across applications using standardized, pre-defined processes. ExposureIQ Update: New Geocoding Resolutions for Geopolitical and Geopolitical Spider Accumulations ExposureIQ enables users to visualize and report on global exposure concentrations and hotspots. With a range of accumulation reports available, users can drill down to detailed geographic resolutions helping to quickly identify key drivers of loss. RMS has expanded the set of geocoding resolutions on which users can run a geopolitical analysis or geopolitical spider accumulation to offer more geographical granularities for analysis. For geopolitical spider accumulations, users can now specify more geographic areas, including CRESTA and city-level. In addition, users can select from the expanded list of resolutions at which to calculate losses, such as running accumulations for every city within a given country. The table below summarizes all the types of accumulations currently supported by ExposureIQ. Type Function Sample Use Case Geopolitical Computes exposure concentrations by applying a set of damage factors to regions specified at some level of geographic granularity. Find the total exposure by postal code region. Hazard Computes exposure concentrations by applying a set of damage factors to regions defined by hazard layers Find the exposure for a theoretical loss scenario based on different damage levels realized for each depth band of a flood hazard map. Event Response Computes exposure concentrations by applying a set of damage factors to regions defined by event files representing real-world events. Find the exposure to an actual historic event based on different damage levels realized for each intensity band in the event footprint. Circle Spider Locates circular areas of a fixed diameter containing the highest level of exposure by applying band-specific damage factors to all exposures in up to three damage bands. Find the top 100-meter circles worth of total exposure within a specified search region. Geopolitical Spider Locates geopolitical regions of a specified granularity that contain the highest level of exposure within the boundaries of a broader region. Find the Admin1 regions that have the highest total exposure within a set of countries. BENEFITS Identify key loss drivers and more focused areas of exposure concentrations and hotspots. For additional information on Risk Modeler, ExposureIQ, or the IRP, you can visit the Risk Modeler, ExposureIQ, or Intelligent Risk Platform webpages. You can also access release notes on OWL, the RMS Client Support portal.

Alexandria Julius
link
March 03, 2022
How to Interface with Risk Modeler APIs within Microsoft’s Power Query to Work with Your Risk Data within Tools Like Excel and Power BI

Power Query is an ETL tool native to a range of Microsoft products, including Excel and Power BI. It enables rapid data import from multiple sources and cleaning of data right in Excel. The Power Query Editor’s user-friendly user interface can be used to interactively import and perform transformations on data using the underlying “M” query execution engine, which runs queries expressed in the Power Query formula language (“M”). When you create a new import or transformation by interacting with the components of the Power Query interface, M code is automatically generated. Alternatively, users can write M code directly. Power Query offers many data source options for imports, such as files or databases, but the option to import data from the web is of particular interest because this mechanism can be used to pull data directly from RMS Intelligent Risk Platform REST APIs. Prerequisite  For this tutorial to work, you’ll need: Microsoft Excel 2010 or more recent on a Windows machine Import Data from Web via Power Query Let’s take the use case where you want to import location details from Risk Modeler into Excel. To illustrate the process, let’s start by stepping through importing data through the user interface. Within Excel, go to the Data menu ribbon and click on From Web in the Get & Transform Data menu. Create an Advanced request with the URL parts as the GET request you’d like to make and the Authorization header as your API key. Click OK to make the request. This returns a preview of the imported data. Navigate to the property record by double clicking on Record. Convert the query results into a table by clicking Into Table.  The steps we just walked through are listed as Applied Steps on the right. You can perform additional transformation steps on the data here in the Power Query Editor. Bring the Power Query Results Into Excel Once you are comfortable with your table, click Close & Load beneath the Home menu ribbon. Power Query will bring your results into Excel and the data may be viewed and manipulated as normal. Edit Query in the Advanced Editor To see the M code being generated behind the user interface, reopen the Power Query Editor and select Advanced Editor from the Query menu. The same steps listed in the Applied Steps are reflected in the code. You are able to edit the code in the Advanced Editor. Import Data from an Advanced Query The user interface was sufficient for the single API GET request, but the user also has the option to write their own query directly to handle more complex use cases. Let’s write an advanced query to obtain a security token and use that token to make the same Risk Modeler API request that was made above. Within Excel, go to the Data menu ribbon and click on Blank Query and then Advanced Editor. Insert the following example: Copy to let url = "https://api-euw1.rms-ppe.com/sml/auth/v1/login/implicit", body = "{ ""tenantName"": ""<RI tenant name>"", ""username"": ""<RI User Name>"", ""password"": ""<RI Password>"" }", // Obtain a bearer token GetJson = Web.Contents(url, [ Headers = [#"Content-Type"="application/json"], Content = Text.ToBinary(body) ] ), FormatAsJson = Json.Document(GetJson), // Gets token from the Json response Access_Token = FormatAsJson[accessToken], AccessTokenHeader = "Bearer " & Access_Token, Source = Json.Document(Web.Contents("https://api-euw1.rms-ppe.com/riskmodeler/v1/locations/2/?datasource=s15", [Headers=[#"Authorization"=AccessTokenHeader]])), property = Source[property] in property let url = "https://api-euw1.rms-ppe.com/sml/auth/v1/login/implicit", body = "{ ""tenantName"": ""<RI tenant name>"", ""username"": ""<RI User Name>"", ""password"": ""<RI Password>"" }", // Obtain a bearer token GetJson = Web.Contents(url, [ Headers = [#"Content-Type"="application/json"], Content = Text.ToBinary(body) ] ), FormatAsJson = Json.Document(GetJson), // Gets token from the Json response Access_Token = FormatAsJson[accessToken], AccessTokenHeader = "Bearer " & Access_Token, Source = Json.Document(Web.Contents("https://api-euw1.rms-ppe.com/riskmodeler/v1/locations/2/?datasource=s15", [Headers=[#"Authorization"=AccessTokenHeader]])), property = Source[property] in property Copy to Replace tenantName with your subdomain name used to access  RMS Intelligent Risk Platform. As an example, if you use the URL https://mytenant.rms.com, the tenantName is “mytenant.” Replace the username and password with the credentials for a RMS Intelligent Risk Platform user account. Click Done. You may be prompted on how to authenticate. Click Edit Permissions and use Anonymous, leaving the URL as populated. You may be prompted about data privacy. You may select to Ignore Pricy level checks and Save. This will allow for the connection to be made with the retrieved access token. Bring the Power Query Results Into Excel Once you are comfortable with your table, click on Close & Load beneath the Home menu ribbon. Power Query will bring your results into Excel and the data may be viewed and manipulated as normal. To refresh the data, click on Refresh within the Queries & Connections section of the Data menu ribbon Save your Excel file. When opening the file or with a Refresh, a new access token will be retrieved to use for the query. Writing queries that automate data import and transformation can significantly reduce time spent on extracting data from Risk Modeler and manually importing and manipulating it in Excel, while also eliminating common copy and paste errors. The two samples above both quickly imported the same location data, and they show how easy it is to get started using Risk Modeler APIs right in Excel.

Anish Patel
link
February 02, 2022
How You Can Quickly and Reliably Upload Large Databases to Data Bridge Using Multi-Part Uploads

When you’re using the cloud to store your data, you need to ensure that you can transfer your data up to the cloud both quickly and reliably. Data can be transferred to Data Bridge in a single stream upload by requesting a pre-signed upload URL and then uploading to it before calling import. A guide for this has been created here. This process is fine for relatively small databases, but when the size of uploads starts becoming large (100MB according to AWS), then you should consider using multi-part uploads for transferring data to the cloud. RMS recommends that you use multi-part upload - for all integration projects. While this process - may be more complicated than a single upload stream, it has a host of benefits: Greater throughput utilization: If you have insufficient bandwidth, you may be limited by certain parts of the network from utilizing your full network capacity in a single stream. By sending multiple streams of data up, you are able to utilize more of your network capacity. Better recovery from network issues: Networks fail all the time, often a simple retry is all that is needed to get it going again, but if you’ve got a 10 GB database you’re transferring up which fails after 9GB has been transferred, you’re going to have to upload the entire 10GB again. If you use multi-part uploads, you’re able to  upload chunks individually to the cloud, meaning if any of the individual uploads fail only that individual chunk needs to be re-uploaded This is a three-step process: Generate an upload from Databridge: /databridge/v1/sql‑instances/{instanceName}/databases/{databaseName}/{fileExtension}/init‑upload Break the file down into chunks and iterate over the following process starting at part number 1: Get a pre-signed URL for S3 to upload the part to /databridge/v1/sql‑instances/{instanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber} Upload the file chunk to S3 using a put request to the pre-signed URL as an octet-stream with the contents of the file. Once the entire file has been chunked up and uploaded, it’s time to tell Data Bridge that you’ve completed uploading the file by issuing a post request to /databridge/v1/sql‑instances/{instanceName}/databases/{databaseName}/{fileExtension}/complete-upload In this tutorial, we’re omitting a couple of things that you should consider when implementing in production. For example: * Multi-Threaded streaming uploads can increase the throughput of uploads * Failure recovery and exception handling are used to ensure that if one chunk fails,    it is retried so the entire process does not fail C# Tutorial - This tutorial has been created using .NET core 3.1 and should work for .NET 5. The only external dependency that we’re using in this walkthrough is Newtonsoft.Json which can be added as a Nuget dependency. Python Tutorial - This tutorial requires one external dependency to be installed-- the requests library which can be pip installed. This sample has also been built using Python 3.9. Generate an Upload ID From Data Bridge Before you get started with uploading your EDM to RMS Intelligent Risk Platform™ using multi-part upload, you must first generate an Upload ID from Data Bridge. In order to do so, you’ll need a SQL Instance ID to get the appropriate SQL Instance to upload the database to. This can be found by querying /databridge/v1/sql-instances and determining which of the returned results to send it to.  In this case, we’re going to assume there is only one SQL Instance returned and we’ll use that. The request is as follows: Copy to C# public static async Task Main(string[] args { using(HttpClient client = new HttpClient()) { string baseUrl = "https://api-euw1.rms.com"; string apiKey = "<<YOUR API KEY HERE>>"; client.BaseAddress = new Uri(baseUrl); //set the auth API key client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey); var sqlInstance = await GetFirstSqlInstance(client); } } private static async Task < string > GetFirstSqlInstance(HttpClient client) { using(HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances")) { request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); using(HttpResponseMessage response = await client.SendAsync(request)) { if (response.IsSuccessStatusCode) { using(HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JArray.Parse(jsonString); return jsonResponse[0]["name"].ToString(); } } throw new Exception("Unable to get SQL insance names. HTTP status Code:" + response.StatusCode); } } } public static async Task Main(string[] args { using(HttpClient client = new HttpClient()) { string baseUrl = "https://api-euw1.rms.com"; string apiKey = "<<YOUR API KEY HERE>>"; client.BaseAddress = new Uri(baseUrl); //set the auth API key client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey); var sqlInstance = await GetFirstSqlInstance(client); } } private static async Task < string > GetFirstSqlInstance(HttpClient client) { using(HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances")) { request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); using(HttpResponseMessage response = await client.SendAsync(request)) { if (response.IsSuccessStatusCode) { using(HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JArray.Parse(jsonString); return jsonResponse[0]["name"].ToString(); } } throw new Exception("Unable to get SQL insance names. HTTP status Code:" + response.StatusCode); } } } Python import requests import time def get_auth_header(auth_key): headers = { "Authorization": auth_key } return headers def get_sql_instance(base_url, headers): url = f"{base_url}/databridge/v1/sql-instances" response = requests.get(url, headers=headers) response.raise_for_status() return response.json()[0]["name"] if __name__ == "__main__": base_url = "https://api-euw1.rms.com" api_key = "xxx" auth_header = get_auth_header(api_key) sql_instance = get_sql_instance(base_url, auth_header) import requests import time def get_auth_header(auth_key): headers = { "Authorization": auth_key } return headers def get_sql_instance(base_url, headers): url = f"{base_url}/databridge/v1/sql-instances" response = requests.get(url, headers=headers) response.raise_for_status() return response.json()[0]["name"] if __name__ == "__main__": base_url = "https://api-euw1.rms.com" api_key = "xxx" auth_header = get_auth_header(api_key) sql_instance = get_sql_instance(base_url, auth_header) Copy to Now that you’ve retrieved your SQL instance, you can construct the request to generate an upload ID from Data Bridge using the following path: /v1/sql‑instances/{instanceName}/databases/{databaseName}/{fileExtension}/init‑upload.  You’ll notice a couple of other things you’re going to need to add into the URL path which are databaseName and fileExtension. The databaseName is the name you’d like to give the database once it’s been uploaded to Data Bridge. The fileextension indicates the type of file you’re sending up. The options are mdf, bak, and dacpac.  For our purposes, we’re going to send up a .mdf file. Copy to C# public static async Task Main(string[] args { using(HttpClient client = new HttpClient()) { // Previous steps omitted here var edmName = "my_edm"; var fileExtensionLiteral = "edm"; var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName); } } private static async Task < string > GenerateUploadId(HttpClient client, string sqlInstance, string fileExtension, string databaseName) { using(HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null)) { if (response.IsSuccessStatusCode) { using(HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JObject.Parse(jsonString); return jsonResponse["uploadId"].ToString(); } } throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode); } } public static async Task Main(string[] args { using(HttpClient client = new HttpClient()) { // Previous steps omitted here var edmName = "my_edm"; var fileExtensionLiteral = "edm"; var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName); } } private static async Task < string > GenerateUploadId(HttpClient client, string sqlInstance, string fileExtension, string databaseName) { using(HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null)) { if (response.IsSuccessStatusCode) { using(HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JObject.Parse(jsonString); return jsonResponse["uploadId"].ToString(); } } throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode); } } Python def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload" response = requests.post(url, headers=headers) response.raise_for_status() return response.json()["uploadId"] if __name__ == "__main__": # Previous steps omitted here database_name = "my_python_edm" file_extension = "mdf" upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance) def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload" response = requests.post(url, headers=headers) response.raise_for_status() return response.json()["uploadId"] if __name__ == "__main__": # Previous steps omitted here database_name = "my_python_edm" file_extension = "mdf" upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance) Copy to Now that you have your Upload ID, you can start chunking up the file and sending the parts of the file up. Break Down the File Into Chunks and Upload Each Chunk Individually This is the step where the majority of the work takes place. You’re going to start by breaking the file into chunks by using a FileStream in C# and read a specified chunk of the file into the buffer before uploading this chunk. Copy to C# public static async Task Main(string[] args { using (HttpClient client = new HttpClient()) { // Previous steps omitted here var localFilePath = "my_edm.mdf"; var bufferSizeInBytes = 20 * 1024 * 1024; string fileExtensionLiteral = "mdf"; var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath, fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId); } } private static async Task<Dictionary<string, string>> UploadFilesUsingMultiPartUpload(HttpClient client, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) { using (FileStream fileStream = File.OpenRead(localFilePath)) { var partNumber = 1; var etags = new Dictionary<string, string>(); while (true) { var buffer = new byte[bufferSizeInBytes]; // Read a chunk from the file the size of the buffer var bytesRead = fileStream.Read(buffer, 0, buffer.Length); if (bytesRead == 0) { break; } // Get partial upload URL var uploadUrl = await GetPresignedPartialURL(client, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber); // Upload chunk to URL etags.Add(partNumber.ToString(), await UploadByteArray(buffer, bytesRead, uploadUrl)); partNumber++; } return etags; } } private async static Task<string> GetPresignedPartialURL(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber) { using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, $”databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}”)) { request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(“application/json”)); using (HttpResponseMessage response = await client.SendAsync(request)) { using (HttpContent content = response.Content) { var url = await content.ReadAsStringAsync(); return url.Trim(‘”’); } } } } private async static Task<string> UploadByteArray(byte[] buffer, int length, string uploadUrl) { using (HttpClient client = new HttpClient()) { using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length)) { content.Headers.Add(“Content-Type”, “application/octet-stream”); using (HttpResponseMessage response = await client.PutAsync(uploadUrl, content)) { if (response.IsSuccessStatusCode) { return response.Headers.Etag.Tag.Trim(‘”’); } throw new Exception(“Unable to upload chunk. HTTP status Code:” + response.StatusCode); } } } } public static async Task Main(string[] args { using (HttpClient client = new HttpClient()) { // Previous steps omitted here var localFilePath = "my_edm.mdf"; var bufferSizeInBytes = 20 * 1024 * 1024; string fileExtensionLiteral = "mdf"; var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath, fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId); } } private static async Task<Dictionary<string, string>> UploadFilesUsingMultiPartUpload(HttpClient client, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) { using (FileStream fileStream = File.OpenRead(localFilePath)) { var partNumber = 1; var etags = new Dictionary<string, string>(); while (true) { var buffer = new byte[bufferSizeInBytes]; // Read a chunk from the file the size of the buffer var bytesRead = fileStream.Read(buffer, 0, buffer.Length); if (bytesRead == 0) { break; } // Get partial upload URL var uploadUrl = await GetPresignedPartialURL(client, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber); // Upload chunk to URL etags.Add(partNumber.ToString(), await UploadByteArray(buffer, bytesRead, uploadUrl)); partNumber++; } return etags; } } private async static Task<string> GetPresignedPartialURL(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber) { using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, $”databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}”)) { request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(“application/json”)); using (HttpResponseMessage response = await client.SendAsync(request)) { using (HttpContent content = response.Content) { var url = await content.ReadAsStringAsync(); return url.Trim(‘”’); } } } } private async static Task<string> UploadByteArray(byte[] buffer, int length, string uploadUrl) { using (HttpClient client = new HttpClient()) { using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length)) { content.Headers.Add(“Content-Type”, “application/octet-stream”); using (HttpResponseMessage response = await client.PutAsync(uploadUrl, content)) { if (response.IsSuccessStatusCode) { return response.Headers.Etag.Tag.Trim(‘”’); } throw new Exception(“Unable to upload chunk. HTTP status Code:” + response.StatusCode); } } } } Python def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): etags = {} with open(local_file_path, ‘rb’) as db_file: chunk = db_file.read(chunk_size_bytes) part_number = 1 while chunk: url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number) etag = upload_chunk_to_url(url, chunk) etags[str(part_number)] = etag chunk = db_file.read(chunk_size_bytes) part_number = part_number + 1 def upload_chunk_to_url(url, chunk): headers = { “Content-Type”: “application/octet-stream” } response = requests.put(url, headers=headers, data=chunk) response.raise_for_status() return response.headers[“Etag”].strip(‘\”’) def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number): url = f”{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}” response = requests.get(url, headers=headers) response.raise_for_status() return response.text if __name__ == “__main__”: # Previous steps omitted here chunk_size = 20 * 1024 * 1024 local_path = “my_python_edm.mdf” upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id) def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): etags = {} with open(local_file_path, ‘rb’) as db_file: chunk = db_file.read(chunk_size_bytes) part_number = 1 while chunk: url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number) etag = upload_chunk_to_url(url, chunk) etags[str(part_number)] = etag chunk = db_file.read(chunk_size_bytes) part_number = part_number + 1 def upload_chunk_to_url(url, chunk): headers = { “Content-Type”: “application/octet-stream” } response = requests.put(url, headers=headers, data=chunk) response.raise_for_status() return response.headers[“Etag”].strip(‘\”’) def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number): url = f”{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}” response = requests.get(url, headers=headers) response.raise_for_status() return response.text if __name__ == “__main__”: # Previous steps omitted here chunk_size = 20 * 1024 * 1024 local_path = “my_python_edm.mdf” upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id) Copy to In the above sample, it’s worth pointing out a couple of things. The first is that you’re iterating the code over blocks of the file. If the file is 100MB in size and you’re breaking it into 20MB chunks, you should expect this loop to run 5 times. Don’t forget to include a break clause in the loop if you find you’ve completed the file, otherwise, you’re going to run into an infinite loop. You read a section of the file into a buffer byte array, then get a pre-signed URL from the Data Bridge API to find where to PUT the chunk of the file. Once the URL has been retrieved, then upload a byte array. Note that you’re using an octet stream type of content. This is to transmit binary content, which is what the byte arrays are. Once you’ve retrieved the URL to upload to, you’re going to upload a byte array to it. Note that you’re using an octet stream type of content - this is to transmit binary content which is what your byte arrays are. The last thing to note is the Dictionary called etags you’re caching responses into. You’re looking to collect etags from each of your parts to pass them up to the next phase of the workflow - issuing a complete statement. Issue Upload Complete Statement You’re now onto the last step in the multi-part upload, issuing a complete upload request. Copy to C# public static async Task Main(string[] args { using (HttpClient client = new HttpClient()) { // Previous steps omitted here await CompleteUpload(client, databaseName, fileExtension, sqlInstanceName, uploadId, etags); await AttachDatabase(client, sqlInstance, edmName, /*MDF = 0, BAK = 1*/0); } } private static async Task CompleteUpload(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags) { using (HttpContent payload = new StringContent( string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json")) { using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload)) { if (!response.IsSuccessStatusCode) { throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode); } } } } private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, int fileFormat) { using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileFormat}", null)) { if (response.IsSuccessStatusCode) { using (HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JObject.Parse(jsonString); string jobId = jsonResponse["jobId"].ToString(); // poll until job is complete await PollJobStatus(client, jobId); } } } } private static async Task PollJobStatus(HttpClient client, string jobId, int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000) { string status; int totalWaitTime = 0; while (totalWaitTime < maxWaitTimeInMilliseonds) { // Query Job API using (HttpResponseMessage response = await client.GetAsync($"databridge/v1/Jobs/{jobId}")) { using (HttpContent content = response.Content) { status = await content.ReadAsStringAsync(); if (status == "Succeeded") { break; } Thread.Sleep(sleepIntervalInMilliseconds); totalWaitTime += sleepIntervalInMilliseconds; } } } } public static async Task Main(string[] args { using (HttpClient client = new HttpClient()) { // Previous steps omitted here await CompleteUpload(client, databaseName, fileExtension, sqlInstanceName, uploadId, etags); await AttachDatabase(client, sqlInstance, edmName, /*MDF = 0, BAK = 1*/0); } } private static async Task CompleteUpload(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags) { using (HttpContent payload = new StringContent( string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json")) { using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload)) { if (!response.IsSuccessStatusCode) { throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode); } } } } private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, int fileFormat) { using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileFormat}", null)) { if (response.IsSuccessStatusCode) { using (HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JObject.Parse(jsonString); string jobId = jsonResponse["jobId"].ToString(); // poll until job is complete await PollJobStatus(client, jobId); } } } } private static async Task PollJobStatus(HttpClient client, string jobId, int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000) { string status; int totalWaitTime = 0; while (totalWaitTime < maxWaitTimeInMilliseonds) { // Query Job API using (HttpResponseMessage response = await client.GetAsync($"databridge/v1/Jobs/{jobId}")) { using (HttpContent content = response.Content) { status = await content.ReadAsStringAsync(); if (status == "Succeeded") { break; } Thread.Sleep(sleepIntervalInMilliseconds); totalWaitTime += sleepIntervalInMilliseconds; } } } } Python def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): # Previous steps omitted complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload" payload = { "uploadId": upload_id, "etags": etags } response = requests.post(complete_url, headers=headers, json=payload) response.raise_for_status() def attach_database(base_url,headers, sql_instance, database_name, fileType): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}" response = requests.post(url, headers=headers) response.raise_for_status() jobId = response.json()["jobId"] status = 'InProgress' totalWaitTime = 0 while(totalWaitTime < 300): response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers) status = response.text if status != 'Succeeded': time.sleep(5) totalWaitTime += 5 else: break def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): # Previous steps omitted complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload" payload = { "uploadId": upload_id, "etags": etags } response = requests.post(complete_url, headers=headers, json=payload) response.raise_for_status() def attach_database(base_url,headers, sql_instance, database_name, fileType): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}" response = requests.post(url, headers=headers) response.raise_for_status() jobId = response.json()["jobId"] status = 'InProgress' totalWaitTime = 0 while(totalWaitTime < 300): response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers) status = response.text if status != 'Succeeded': time.sleep(5) totalWaitTime += 5 else: break Copy to You’ve seen several requests up to the DataBridge API at this stage, so there isn’t much new here besides the inclusion of the etags payload. You’re adding the uploadId and etags that were retrieved from each of the upload chunks and sending them as part of the payload to DataBridge. The full sample code to achieve this multi-part upload in C# is below. ## Complete C# Sample Code Copy to C# using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Http; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System.Threading.Tasks; using System.Net.Http.Headers; using System.Globalization; using System.Threading; using System.Text; namespace UploadSample { public class MultiPartUploadProgram { public enum FileType { MDF = 0, BAK = 1, DACPAC = 2 } public static async Task Main(string[] args) { using (HttpClient client = new HttpClient()) { FileType fileExtension; string baseUrl = "https://api-euw1.rms.com"; string apiKey = args[0]; var edmName = args[1]; var localFilePath = args[2]; // overrride baseURL if defined if (args.Length > 3) { baseUrl = args[3]; } client.BaseAddress = new Uri(baseUrl); var bufferSizeInBytes = 20 * 1024 * 1024; if (localFilePath.EndsWith("mdf", true, CultureInfo.InvariantCulture)) { fileExtension= FileType.MDF; } else if (localFilePath.EndsWith("bak", true, CultureInfo.InvariantCulture)) { fileExtension= FileType.BAK; } else { Console.WriteLine("Invalid File extension. Supported extensions are .mdf and .bak "); return; } string fileExtensionLiteral = Enum.GetName(typeof(FileType), fileExtension).ToLowerInvariant(); //set the auth API key client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey); var sqlInstance = await GetSqlInstance(client); var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName); var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath, fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId); await CompleteUpload(client, edmName, fileExtensionLiteral, sqlInstance, uploadId, etags); await AttachDatabase(client, sqlInstance, edmName, fileExtension); } } private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, FileType fileFormat) { int fileExtensionValue = (int)fileFormat; using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileExtensionValue}", null)) { if (response.IsSuccessStatusCode) { using (HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JObject.Parse(jsonString); string jobId = jsonResponse["jobId"].ToString(); // poll until job is complete await PollJobStatus(client, jobId); } } } } private static async Task PollJobStatus(HttpClient client, string jobId, int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000) { string status; int totalWaitTime = 0; while (totalWaitTime < maxWaitTimeInMilliseonds) { // Query Job API using (HttpResponseMessage response = await client.GetAsync($"databridge/v1/Jobs/{jobId}")) { using (HttpContent content = response.Content) { status = await content.ReadAsStringAsync(); if (status == "Succeeded") { break; } Thread.Sleep(sleepIntervalInMilliseconds); totalWaitTime += sleepIntervalInMilliseconds; } } } } private static async Task<Dictionary<string, string>> UploadFilesUsingMultiPartUpload(HttpClient client, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) { using (FileStream fileStream = File.OpenRead(localFilePath)) { var partNumber = 1; var etags = new Dictionary<string, string>(); while (true) { var buffer = new byte[bufferSizeInBytes]; var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer if (bytesRead == 0) { break; } // Get partial upload URL var uploadUrl = await GetPresignedPartialURL(client, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber); // Upload chunk to URL etags.Add(partNumber.ToString(), await UploadByteArray(buffer, bytesRead, uploadUrl)); partNumber++; } return etags; } } private async static Task<string> GetPresignedPartialURL(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber) { using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, $"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}")) { request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); using (HttpResponseMessage response = await client.SendAsync(request)) { using (HttpContent content = response.Content) { var url = await content.ReadAsStringAsync(); return url.Trim('"'); } } } } private async static Task<string> UploadByteArray(byte[] buffer, int length, string uploadUrl) { using (HttpClient client = new HttpClient()) { using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length)) { content.Headers.Add("Content-Type", "application/octet-stream"); using (HttpResponseMessage response = await client.PutAsync(uploadUrl, content)) { if (response.IsSuccessStatusCode) { return response.Headers.ETag.Tag.Trim('"'); } throw new Exception("Unable to upload chunk. HTTP status Code:" + response.StatusCode); } } } } private static async Task CompleteUpload(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags) { using (HttpContent payload = new StringContent(string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json")) { using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload)) { if (!response.IsSuccessStatusCode) { throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode); } } } } private static async Task<string> GenerateUploadId(HttpClient client, string sqlInstance, string fileExtension, string databaseName) { using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null)) { if (response.IsSuccessStatusCode) { using (HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JObject.Parse(jsonString); return jsonResponse["uploadId"].ToString(); } } throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode); } } private static async Task<string> GetSqlInstance(HttpClient client) { using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances")) { request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); using (HttpResponseMessage response = await client.SendAsync(request)) { if (response.IsSuccessStatusCode) { using (HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JArray.Parse(jsonString); return jsonResponse[0]["name"].ToString(); } } throw new Exception("Unable to get SQL insance names. HTTP status Code:" + response.StatusCode); } } } } } using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Http; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System.Threading.Tasks; using System.Net.Http.Headers; using System.Globalization; using System.Threading; using System.Text; namespace UploadSample { public class MultiPartUploadProgram { public enum FileType { MDF = 0, BAK = 1, DACPAC = 2 } public static async Task Main(string[] args) { using (HttpClient client = new HttpClient()) { FileType fileExtension; string baseUrl = "https://api-euw1.rms.com"; string apiKey = args[0]; var edmName = args[1]; var localFilePath = args[2]; // overrride baseURL if defined if (args.Length > 3) { baseUrl = args[3]; } client.BaseAddress = new Uri(baseUrl); var bufferSizeInBytes = 20 * 1024 * 1024; if (localFilePath.EndsWith("mdf", true, CultureInfo.InvariantCulture)) { fileExtension= FileType.MDF; } else if (localFilePath.EndsWith("bak", true, CultureInfo.InvariantCulture)) { fileExtension= FileType.BAK; } else { Console.WriteLine("Invalid File extension. Supported extensions are .mdf and .bak "); return; } string fileExtensionLiteral = Enum.GetName(typeof(FileType), fileExtension).ToLowerInvariant(); //set the auth API key client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(apiKey); var sqlInstance = await GetSqlInstance(client); var uploadId = await GenerateUploadId(client, sqlInstance, fileExtensionLiteral, edmName); var etags = await UploadFilesUsingMultiPartUpload(client, edmName, localFilePath, fileExtensionLiteral, bufferSizeInBytes, sqlInstance, uploadId); await CompleteUpload(client, edmName, fileExtensionLiteral, sqlInstance, uploadId, etags); await AttachDatabase(client, sqlInstance, edmName, fileExtension); } } private static async Task AttachDatabase(HttpClient client,string instanceName, string edmName, FileType fileFormat) { int fileExtensionValue = (int)fileFormat; using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{instanceName}/Databases/{edmName}/import?importFrom={fileExtensionValue}", null)) { if (response.IsSuccessStatusCode) { using (HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JObject.Parse(jsonString); string jobId = jsonResponse["jobId"].ToString(); // poll until job is complete await PollJobStatus(client, jobId); } } } } private static async Task PollJobStatus(HttpClient client, string jobId, int sleepIntervalInMilliseconds = 5000, int maxWaitTimeInMilliseonds = 300000) { string status; int totalWaitTime = 0; while (totalWaitTime < maxWaitTimeInMilliseonds) { // Query Job API using (HttpResponseMessage response = await client.GetAsync($"databridge/v1/Jobs/{jobId}")) { using (HttpContent content = response.Content) { status = await content.ReadAsStringAsync(); if (status == "Succeeded") { break; } Thread.Sleep(sleepIntervalInMilliseconds); totalWaitTime += sleepIntervalInMilliseconds; } } } } private static async Task<Dictionary<string, string>> UploadFilesUsingMultiPartUpload(HttpClient client, string databaseName, string localFilePath, string fileExtension, int bufferSizeInBytes, string sqlInstanceName, string uploadId) { using (FileStream fileStream = File.OpenRead(localFilePath)) { var partNumber = 1; var etags = new Dictionary<string, string>(); while (true) { var buffer = new byte[bufferSizeInBytes]; var bytesRead = fileStream.Read(buffer, 0, buffer.Length); // Read a chunk from the file the size of the buffer if (bytesRead == 0) { break; } // Get partial upload URL var uploadUrl = await GetPresignedPartialURL(client, databaseName, fileExtension, sqlInstanceName, uploadId, partNumber); // Upload chunk to URL etags.Add(partNumber.ToString(), await UploadByteArray(buffer, bytesRead, uploadUrl)); partNumber++; } return etags; } } private async static Task<string> GetPresignedPartialURL(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, int partNumber) { using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, $"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/upload-part/{uploadId}/{partNumber}")) { request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); using (HttpResponseMessage response = await client.SendAsync(request)) { using (HttpContent content = response.Content) { var url = await content.ReadAsStringAsync(); return url.Trim('"'); } } } } private async static Task<string> UploadByteArray(byte[] buffer, int length, string uploadUrl) { using (HttpClient client = new HttpClient()) { using (ByteArrayContent content = new ByteArrayContent(buffer, 0, length)) { content.Headers.Add("Content-Type", "application/octet-stream"); using (HttpResponseMessage response = await client.PutAsync(uploadUrl, content)) { if (response.IsSuccessStatusCode) { return response.Headers.ETag.Tag.Trim('"'); } throw new Exception("Unable to upload chunk. HTTP status Code:" + response.StatusCode); } } } } private static async Task CompleteUpload(HttpClient client, string databaseName, string fileExtension, string sqlInstanceName, string uploadId, Dictionary<string, string> etags) { using (HttpContent payload = new StringContent(string.Format("{{\"uploadId\": \"{0}\" , \"etags\": {1}}}", uploadId, JsonConvert.SerializeObject(etags)), Encoding.UTF8, "application/json")) { using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstanceName}/databases/{databaseName}/{fileExtension}/complete-upload", payload)) { if (!response.IsSuccessStatusCode) { throw new Exception("Unable to complete upload. HTTP status Code:" + response.StatusCode); } } } } private static async Task<string> GenerateUploadId(HttpClient client, string sqlInstance, string fileExtension, string databaseName) { using (HttpResponseMessage response = await client.PostAsync($"databridge/v1/sql-instances/{sqlInstance}/databases/{databaseName}/{fileExtension}/init-upload", null)) { if (response.IsSuccessStatusCode) { using (HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JObject.Parse(jsonString); return jsonResponse["uploadId"].ToString(); } } throw new Exception("Unable to get upload ID. HTTP status Code:" + response.StatusCode); } } private static async Task<string> GetSqlInstance(HttpClient client) { using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "databridge/v1/sql-instances")) { request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); using (HttpResponseMessage response = await client.SendAsync(request)) { if (response.IsSuccessStatusCode) { using (HttpContent content = response.Content) { var jsonString = await content.ReadAsStringAsync(); var jsonResponse = JArray.Parse(jsonString); return jsonResponse[0]["name"].ToString(); } } throw new Exception("Unable to get SQL insance names. HTTP status Code:" + response.StatusCode); } } } } } Copy to The full sample code to achieve this multi-part upload in Python is below. ## Complete Python Sample Code Copy to Python import requests import time def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): etags = {} with open(local_file_path, 'rb') as db_file: chunk = db_file.read(chunk_size_bytes) part_number = 1 while chunk: url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number) etag = upload_chunk_to_url(url, chunk) etags[str(part_number)] = etag chunk = db_file.read(chunk_size_bytes) part_number = part_number + 1 complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload" payload = { "uploadId": upload_id, "etags": etags } response = requests.post(complete_url, headers=headers, json=payload) response.raise_for_status() def upload_chunk_to_url(url, chunk): headers = { "Content-Type": "application/octet-stream" } response = requests.put(url, headers=headers, data=chunk) response.raise_for_status() return response.headers["Etag"].strip('\"') def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}" response = requests.get(url, headers=headers) response.raise_for_status() return response.text def get_sql_instance(base_url, headers): url = f"{base_url}/databridge/v1/sql-instances" response = requests.get(url, headers=headers) response.raise_for_status() return response.json()[0]["name"] def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload" response = requests.post(url, headers=headers) response.raise_for_status() return response.json()["uploadId"] def attach_database(base_url,headers, sql_instance, database_name, fileType): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}" response = requests.post(url, headers=headers) response.raise_for_status() jobId = response.json()["jobId"] status = 'InProgress' totalWaitTime = 0 while(totalWaitTime < 300): response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers) status = response.text if status != 'Succeeded': time.sleep(5) totalWaitTime += 5 else: break def get_auth_header(auth_key): headers = { "Authorization": auth_key } return headers if __name__ == "__main__": base_url = "https://api-euw1.rms.com" api_key = "***" database_name = "my_python_edm" local_path = "my_python_edm.mdf" file_extension = "mdf" chunk_size = 20 * 1024 * 1024 auth_header = get_auth_header(api_key) sql_instance = get_sql_instance(base_url, auth_header) upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance) upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id) attach_database(base_url, auth_header, sql_instance, database_name, 0) import requests import time def upload_files_using_multi_part_upload(base_url, headers, database_name, local_file_path, file_extension, chunk_size_bytes, sql_instance, upload_id): etags = {} with open(local_file_path, 'rb') as db_file: chunk = db_file.read(chunk_size_bytes) part_number = 1 while chunk: url = get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number) etag = upload_chunk_to_url(url, chunk) etags[str(part_number)] = etag chunk = db_file.read(chunk_size_bytes) part_number = part_number + 1 complete_url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/complete-upload" payload = { "uploadId": upload_id, "etags": etags } response = requests.post(complete_url, headers=headers, json=payload) response.raise_for_status() def upload_chunk_to_url(url, chunk): headers = { "Content-Type": "application/octet-stream" } response = requests.put(url, headers=headers, data=chunk) response.raise_for_status() return response.headers["Etag"].strip('\"') def get_presigned_url(base_url, headers, database_name, file_extension, sql_instance, upload_id, part_number): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/upload-part/{upload_id}/{part_number}" response = requests.get(url, headers=headers) response.raise_for_status() return response.text def get_sql_instance(base_url, headers): url = f"{base_url}/databridge/v1/sql-instances" response = requests.get(url, headers=headers) response.raise_for_status() return response.json()[0]["name"] def generate_upload_id(base_url, headers, database_name, file_extension, sql_instance): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/databases/{database_name}/{file_extension}/init-upload" response = requests.post(url, headers=headers) response.raise_for_status() return response.json()["uploadId"] def attach_database(base_url,headers, sql_instance, database_name, fileType): url = f"{base_url}/databridge/v1/sql-instances/{sql_instance}/Databases/{database_name}/import?importFrom={fileType}" response = requests.post(url, headers=headers) response.raise_for_status() jobId = response.json()["jobId"] status = 'InProgress' totalWaitTime = 0 while(totalWaitTime < 300): response = requests.get(f"{base_url}/databridge/v1/Jobs/{jobId}", headers=headers) status = response.text if status != 'Succeeded': time.sleep(5) totalWaitTime += 5 else: break def get_auth_header(auth_key): headers = { "Authorization": auth_key } return headers if __name__ == "__main__": base_url = "https://api-euw1.rms.com" api_key = "***" database_name = "my_python_edm" local_path = "my_python_edm.mdf" file_extension = "mdf" chunk_size = 20 * 1024 * 1024 auth_header = get_auth_header(api_key) sql_instance = get_sql_instance(base_url, auth_header) upload_id = generate_upload_id(base_url, auth_header, database_name, file_extension, sql_instance) upload_files_using_multi_part_upload(base_url, auth_header, database_name, local_path, file_extension, chunk_size, sql_instance, upload_id) attach_database(base_url, auth_header, sql_instance, database_name, 0) Copy to

Brian Shek
link
January 19, 2022
Automating Your Risk Modeler Workflows with REST APIs

Risk Modeler provides a rich set of capabilities for data access and automation via its REST API. It provides access to the data objects that make up exposure (Portfolios, Accounts, Locations, etc.) and results (Metrics & Loss Tables) as well as the processing functionality of Risk Modeler (Geocoding, Running Analyses, Import, Export, Workflows, etc.). Using the REST APIs is simple and well documented, however developing workflows and business rules to automate Risk Modeler in a way which suits your business goals will depend upon your individual implementation. This post aims to provide an insight into how to develop workflows and automations over the top of the Risk Modeler APIs. A simple end-to-end workflow Let’s consider a simple straight through workflow which takes MRI files, imports them, geocodes them and runs a DLM analysis on the imported exposure. If one were to make a diagram of this workflow, it might look something like the below workflow: The above diagram looks simple enough, until you start to factor in some of the complexity involved in the individual steps. The first step for instance, Import MRI, this involves the following process: Create a storage bucket in AWS Upload Acc File Get S3 upload details Upload file to S3 Upload Loc file Get S3 Upload details Upload file to S3 Upload Mapping file Get S3 upload details Upload file to S3 Create an MRI Import workflow Monitor for MRI Import completion before moving to the next step This is a workflow within a workflow. Represented in a diagram, this import MRI workflow looks like the following: As you can see, it’s already starting to get much more involved, and you’ve only just imported MRI files to Risk Modeler software. At this stage, a couple of things jump out that you should consider standardizing as you’re building out this workflow. Monitoring Asynchronous Jobs You’re going to be monitoring asynchronous jobs for completion throughout your integration with Risk Modeler, so you will benefit from separating out the implementation of monitoring the jobs from the remainder of the implementation so that you can call this functionality anywhere you need it. Copy to def workflow_complete(client: RMClient, url: str) -> bool: WORKFLOW_COMPLETE = "FINISHED" WORKFLOW_FAILED = "FAILED" MAX_ITERATIONS = 100 SLEEP_TIME = 20 current_iteration = 0 status = "" while status != WORKFLOW_COMPLETE and current_iteration < MAX_ITERATIONS: workflow = client.get_workflow(url) status = workflow["status"] print(status) if status == WORKFLOW_COMPLETE: return True if status == WORKFLOW_FAILED: raise ValueError(f"The workflow {url} failed", workflow) time.sleep(SLEEP_TIME) current_iteration = current_iteration + 1 return False def workflow_complete(client: RMClient, url: str) -> bool: WORKFLOW_COMPLETE = "FINISHED" WORKFLOW_FAILED = "FAILED" MAX_ITERATIONS = 100 SLEEP_TIME = 20 current_iteration = 0 status = "" while status != WORKFLOW_COMPLETE and current_iteration < MAX_ITERATIONS: workflow = client.get_workflow(url) status = workflow["status"] print(status) if status == WORKFLOW_COMPLETE: return True if status == WORKFLOW_FAILED: raise ValueError(f"The workflow {url} failed", workflow) time.sleep(SLEEP_TIME) current_iteration = current_iteration + 1 return False Copy to The code sample above shows an implementation of a workflow_complete function which takes a RMClient (more on this later) and a url with which to query the workflow status. This function calls get_workflow from Risk Modeler (/v1/workflows/{workflow_id}) and checks the status returned from the workflow API. If the workflow is Finished, the function returns True, if it’s Failed, it raises an Error, if it does not finish or fail before you’ve made 100 calls to get_workflow, it returns False. The calling code is responsible for handling what to do with the workflow completion response. Separating out this workflow logic enables you to standardize your implementation of the number of attempts to check workflow status as well as the time to wait between requests – NOTE: including a sleep time here is recommended, otherwise this function may make too many calls to the Risk Modeler API too quickly. Max Iterations and Sleep Time are some values that are defined here which are tuneable as appropriate for your needs. These should be able to accommodate your longest running jobs, but also be quick enough to return responses for quicker running workflows. Abstracting Common Functionality The uploading of Acc, Loc, and Mapping files are subtly different, but they share so much common functionality that it makes sense to group them together and be able to abstract away their implementation to generalize the functionality. Creating an upload_file function that takes all the input parameters into account makes sense in this context. The following code does just that: Copy to def upload_file(client: RMClient, file_type: str, file_path:str, bucket_id:str) -> S3UploadDetails: file_name = file_path.split('/')[len(file_path.split('/')) - 1] file_size = os.path.getsize(file_name) upload_details = client.get_s3_upload_details(file_type, bucket_id, file_size, file_name) aws_upload_client = AWSS3UploadClient(upload_details) aws_upload_client.upload_file(file_path, file_name) return upload_details def upload_file(client: RMClient, file_type: str, file_path:str, bucket_id:str) -> S3UploadDetails: file_name = file_path.split('/')[len(file_path.split('/')) - 1] file_size = os.path.getsize(file_name) upload_details = client.get_s3_upload_details(file_type, bucket_id, file_size, file_name) aws_upload_client = AWSS3UploadClient(upload_details) aws_upload_client.upload_file(file_path, file_name) return upload_details Copy to Here we’ve defined a DataClass called S3UploadDetails which is a representation of what is returned from Risk Modeler when we ask for S3 Upload Details (/v1/storage/{bucket_id}/path). We’ve also defined another client class, AWSS3UploadClient to handle our AWS S3 communication. Creating this upload_file function now enables you to call it from anywhere you want to upload a file, regardless of the type of file you’re looking to upload. Encapsulating Client Implementations In a couple of locations here, we’ve encapsulated client code for Risk Modeler as well as for AWS. This means you don’t have to concern yourself with the implementation of those clients as you’re building your workflows and they can be developed independently. In some instances, where it makes sense, you also can create data classes to represent some of the objects you’re interacting with in a more strongly typed way and to abstract functionality from your workflow implementations. The Risk Modeler client you’ve created provides functions as API implementations so that they can be called from the code that needs them. For example, if you want to get a portfolio ID from an EDM based on the portfolio name, you can just call the function get_portfolio_id and pass in the name of the portfolio and EDM into the below function which makes an API call to Risk Modeler. Copy to def get_portfolio_id(self, portfolio_name, datasource_name): url = f"{self.base_url}/v2/portfolios/?datasource={datasource_name}&q=name LIKE \"{portfolio_name}\"" response = requests.get(url, headers=self._default_header()) return response.json()["searchItems"][0]["id"] def get_portfolio_id(self, portfolio_name, datasource_name): url = f"{self.base_url}/v2/portfolios/?datasource={datasource_name}&q=name LIKE \"{portfolio_name}\"" response = requests.get(url, headers=self._default_header()) return response.json()["searchItems"][0]["id"] Copy to Bringing it Together With these abstractions in place, as well as your client implementations, you can have a much simpler job building a workflow. Now all you need to do is call your abstractions, e.g. import_mri, geohaz and process_account so that it looks a lot more like the business centric abstraction you built in the initial workflow diagram referenced above: Copy to def run_process(arguments: Arguments): client = RMClient(arguments.base_url, arguments.api_key) portfolio_id = client.create_portfolio(arguments.portfolio_name, arguments.datasource) print(f"Portfolio {portfolio_id} Created") import_mri(client, arguments, portfolio_id) accounts = client.get_accounts_in_portfolio_by_id(portfolio_id, arguments.datasource) account_id = accounts[0]["accountId"] geohaz_workflow_url = client.geohaz_account(arguments.datasource, account_id) print("Submitted GeoHaz") if workflow_complete(client, geohaz_workflow_url): print("Geocoding complete") else: raise SystemExit("Geocoding did not complete") run_dlm_workflow_url = client.process_account(arguments.datasource, account_id, arguments.model_profile_id) print("Submitted DLM") if workflow_complete(client, run_dlm_workflow_url): print("DLM complete") else: raise SystemExit("Analysis did not complete") def run_process(arguments: Arguments): client = RMClient(arguments.base_url, arguments.api_key) portfolio_id = client.create_portfolio(arguments.portfolio_name, arguments.datasource) print(f"Portfolio {portfolio_id} Created") import_mri(client, arguments, portfolio_id) accounts = client.get_accounts_in_portfolio_by_id(portfolio_id, arguments.datasource) account_id = accounts[0]["accountId"] geohaz_workflow_url = client.geohaz_account(arguments.datasource, account_id) print("Submitted GeoHaz") if workflow_complete(client, geohaz_workflow_url): print("Geocoding complete") else: raise SystemExit("Geocoding did not complete") run_dlm_workflow_url = client.process_account(arguments.datasource, account_id, arguments.model_profile_id) print("Submitted DLM") if workflow_complete(client, run_dlm_workflow_url): print("DLM complete") else: raise SystemExit("Analysis did not complete") Copy to Production Considerations You’ve seen it’s both easy to make API calls as well as potentially challenging to build complex workflows. Getting the abstractions right as well as implementing the appropriate business logic is where the complexity lies. Some of the things we’ve not spent much time discussing in this post are testing and error handling. When one is looking to deploy something into production, it’s vital that it’s properly tested (Unit, Integration and Acceptance) and can handle and recover from errors that arise in the process. If building out a production implementation seems like a daunting task and you could either do with some support in designing/building it out or getting a safe pair of hands to handle the implementation, RMS Consulting can help you in this process. We’ve developed our own tools and frameworks to help both speed up the implementation process as well as ensure it meets high quality standards. Rather than starting an implementation project from scratch, using the Cat Accelerate framework can make automating Risk Modeler more of a turnkey implementation, saving you both money and time. Reach out to your account manager to speak to RMS Consulting about how we can help you with your implementation. Sample code for the solution built out here is available at https://github.com/RMS-Consulting/databridge-api-workflow/tree/main/PythonRunAModel . This is not production code, but intended to serve as an implementation example.

Alexandria Julius
link
January 12, 2022
Interact with Risk Modeler Directly from within Slack Using Our REST APIs and Slack Slash Commands

A Slack Slash Command is one way to write an integration with Slack to plug custom functionality into Slack for your team to use. Specifically, a Slash Command is an application that can be invoked by users in your Slack workspace. You can define the user input, what is returned to the user, and what is happening in the workflow behind the scenes. To show how you can connect to the RMS Intelligent Risk Platform™ from a Slack Slash Command, let’s step through creating a Slash Command that a user can use to return the status of a given Risk Modeler workflow ID. Technologies Used Slack Slash Command is used to call an API endpoint. Serverless framework is used to create the AWS infrastructure. AWS API Gateway is used to create the API endpoint. AWS Lambda is triggered by the API call and is the function that calls Risk Modeler. The Lambda function in this tutorial is written in Python. AWS CloudWatch is used for function logging. AWS System Manager SSM Parameter Store is used for storing environment variables. Prerequisites Slack account with permission to install the Slack Slash Command. AWS command line configured to an existing AWS account. Serverless framework installed. Pip package installer for Python. Architecture The Serverless Framework provides a way to quickly create and manage the AWS architecture behind the API that the Slack Slash Command calls.   Creating a New Serverless Application The Serverless framework allows you to easily deploy an API endpoint by creating and managing the AWS infrastructure. Start by creating a new Serverless application. serverless create --template aws-python3 --path WorkflowSlackSlashCommand This command creates a new project named workflow-status, and automatically generates serverless.yml to define the configuration for the service and handler.py which declares the Lambda function. When you open serverless.yml, you can see additional configuration settings you are able to provide for your service. For this tutorial, update the file with the following code. Copy to service: WorkflowSlackSlashCommand frameworkVersion: '2' provider: name: aws runtime: python3.8 lambdaHashingVersion: 20201221 functions: workflow_status: handler: handler.workflow_status events: - httpApi: path: "/" method: post environment: API_KEY: ${ssm:api_key} ENV_URL: ${ssm:env_url} service: WorkflowSlackSlashCommand frameworkVersion: '2' provider: name: aws runtime: python3.8 lambdaHashingVersion: 20201221 functions: workflow_status: handler: handler.workflow_status events: - httpApi: path: "/" method: post environment: API_KEY: ${ssm:api_key} ENV_URL: ${ssm:env_url} Copy to Service: specifies the name of serverless application. Provider: defines the provider and runtime details. Functions: defines the lambda function, its handler, and trigger. The lambda function is named workflow_status. The events parameter will create and configure an API gateway that accepts post requests and triggers the lambda function. Environment: The api_key and env_url will be accessed from the AWS SSM parameter store. Install Python Requirements In Python, the Requests library will be used to make calls to Risk Modeler software. Add a requirements.txt file to your application, the Requests library in the requirements file, and install the requirement using the following command: pip install -t src/vendor -r requirements.txt This will generate the src folder with the required dependencies.   Add Environment Variables to the Parameter Store The environment variables API_KEY and ENV_URL for your tenant will be referenced from the AWS SSM parameter store. To add your values for those parameters to the store, run: aws ssm put-parameter --name "api_key" --type "String" --value "api-key value" aws ssm put-parameter --name "env_url" --type "String" --value "env-url value" Writing the Handler for the Lambda Function You are now ready to write the actual Lambda function. The user Slack command will make a POST request to API Gateway and any user input passed in through the event is encoded in the text field of the event body. When the user submits no input with the slash command, the handler returns the five most recent workflow names and IDs. When the user submits a valid workflow ID, the function returns the workflow status. Otherwise, an error message is returned. Copy to C# import sys sys.path.insert(0, 'src/vendor') # Add installed library to path import os import json import requests import base64 def workflow_status(event, context): # Decode workflow ID from user input event_body = base64.b64decode(event["body"], validate=True) workflow_id = event_body.split(b"text=")[1].split(b"&")[0].decode('utf-8') # Structure Risk Modeler API call rm_base_url = os.environ['ENV_URL'] + workflow_id api_key = os.environ['API_KEY'] headers = { 'Authorization': api_key } payload={} # Make Risk Modeler API call response = requests.request("GET", rm_base_url, headers=headers, data=payload) # Return response from Risk Modeler if response.status_code == 200: # User provides no input workflow ID if workflow_id == "": all_workflows = json.loads(response.text)["workflows"] last_5_workflows = {} for i in range(5): workflow = all_workflows[i] last_5_workflows[workflow["submitTime"]] = { 'workflow ID': str(workflow["id"]), 'Analysis Name': workflow["name"] } return json.dumps(last_5_workflows, indent=4) # User provides workflow ID else: status = json.loads(response.text)['status'] return { "text": 'workflow ID: ' + workflow_id + "\nstatus: " + status } else: return response.text import sys sys.path.insert(0, 'src/vendor') # Add installed library to path import os import json import requests import base64 def workflow_status(event, context): # Decode workflow ID from user input event_body = base64.b64decode(event["body"], validate=True) workflow_id = event_body.split(b"text=")[1].split(b"&")[0].decode('utf-8') # Structure Risk Modeler API call rm_base_url = os.environ['ENV_URL'] + workflow_id api_key = os.environ['API_KEY'] headers = { 'Authorization': api_key } payload={} # Make Risk Modeler API call response = requests.request("GET", rm_base_url, headers=headers, data=payload) # Return response from Risk Modeler if response.status_code == 200: # User provides no input workflow ID if workflow_id == "": all_workflows = json.loads(response.text)["workflows"] last_5_workflows = {} for i in range(5): workflow = all_workflows[i] last_5_workflows[workflow["submitTime"]] = { 'workflow ID': str(workflow["id"]), 'Analysis Name': workflow["name"] } return json.dumps(last_5_workflows, indent=4) # User provides workflow ID else: status = json.loads(response.text)['status'] return { "text": 'workflow ID: ' + workflow_id + "\nstatus: " + status } else: return response.text Copy to Deploying the Serverless Application Deploying the Lambda function handler and Serverless application configuration is done using the Serverless deploy command. sls deploy -v This command creates an AWS CloudFormation template and deploys the stack in AWS. The stack outputs will be returned on the command line, including an HttpApiUrl to point the Slack application to. You can view the progress in the Cloudformation dashboard, and once complete, you can also view the API Gateway and Lambda dashboards in AWS. Logging for requests made to the Lambda function are captured in AWS CloudWatch. Configuring the Slack Slash Command Now that you have an API URL, you can create a Slack app here to call the API. Create an app from scratch, provide an App Name and Workspace, and click Create App. You may need to request admin approval to install your app to the workspace. Navigate into the App you created and click on  Slash Commands under Add Features and Functionality, then click on Create New Command.   The fields for the new command should be filled out as follows: Command: name of the slash command to be accessed from Slack. The command must start with a /, be all lowercase, and have no spaces. Request URL: API endpoint from AWS API Gateway, returned as  HttpApiUrl above. Short Description: description to be shown to the user in the Slack App. Usage Hint:  hint for users, including any parameters that can be passed into the command. Test Out Your New Slack Slash Command Finally, let’s test our new Slack Slash Command! Once the application is installed on your Slack workspace, you will be able to preview the application functions and autocomplete entries when you type /workflow in Slack. The command /workflow with no user input returns the five most recent workflow names and IDs. The command with a valid workflow ID /workflow 4406222 returns the workflow status. The command with an invalid workflow ID /workflow 123 returns an error message from Risk Modeler . Manage Message Visibility By default, the message will be visible only to the user who triggered the command. This is because the JSON returned from API Gateway to Slack has response_type = ephemeral in the response. To make the command visible to all members of the channel where the command was triggered, response_type can be set to in_channel. For example, the sample response in the function handler that is passed out of API Gateway: return { "text": 'workflow ID: ' + workflow_id + "\nstatus: " + status } Becomes: return { "response_type": "in_channel" "text": 'workflow ID: ' + workflow_id + "\nstatus: " + status } Redeploy using sls deploy and the command will be visible to all members of the channel where the command is triggered.

Loader Icon

The RMS Blog

Get expert perspectives as our team weighs in on the latest events, topics, and insights.

View Blog
Tech Blogs Contact Us

Questions or Feedback on Current Blog Posts? Suggestions for Upcoming Blog Posts?

close button
Overlay Image
Video Title

Thank You

You’ll be contacted by an Moody's RMS specialist shortly.