Apache Beam advanced architecture examples

 

Hello, everyone, and thank you for attending our session today. Today, I’m going to walk you through a demo, along with my colleague Grigory in this session.

 

We want to show you how to put together some of the topics and best practices you’ve heard about so far, especially with the last session of Lorenzo, We’re going to create a repository for a hypothetical data pipeline.

 

However, instead of focusing on the details of Apache Beam transformations or pipeline logic, instead, we’re going to focus on the scaffolding components needed for production, ready project, Meaning we’re going to focus on things like how to structure the pipeline to facilitate Development and testing how to do different levels of testing like unit testing, transform the integration, testing system integration testing and how to build a continuous deployment pipeline that packages test and deploy your pipeline to the respective environment and, finally, how to automate the infrastructure components needed by the Pipeline So, for example, things like BigQuery tables, GCS buckets, you can extend them to include permissions and whatnot.

 

So things like that, You could see the repository that we are demonstrating.

 

You can see it as a kickstart template for you to extend with your code or as a source for some useful code, snippets, and patterns for your project or your existing project.

 

And as you see here in the agenda, we are going to talk about multiple topics, so please bear with me if some points seem basic to you.

 

The thing is, we’re trying to cater to a larger group of people with different levels of experience, And with that, I have two screens here.

 

So I’m going to be switching to the demo screen that I have So just a quick introduction like within the google cloud platform project on GitHub.

 

We have a repository for professional services that’s.

 

The team that I and Grigory are working for. It has a lot of useful examples and tools if you’re interested in GCP.

 

If you’re working with GCP, We publish here usually examples and tools out of our customer engagements and our projects, You can find a lot of useful examples, code, snippets tools, or solutions that are doing or solving a specific problem, and one of them is This dataflow production ready repository that we’re presenting today.

 

So with that, let me walk you through it a bit So, as I said, this is a hypothetical pipeline, more specifically, it’s for machine learning, data, and pre-processing, and for that we’re expecting this kind of input for the pipeline.

 

This is a very simple input, a CSV file that is containing data for two sets of addresses.

 

So we have four fields source address, for city target, address, and target city the point here or the goal of the pipeline is to calculate two similarity features between the two sets of addresses.

 

So we want to calculate address similarity and city similarity based on some text, distance methods, or functions.

 

So we are getting the distance between source, address, target, address, source city, and target city.

 

So let’s have a look at it.

 

How are we doing that from a high-level perspective, This pipeline that we’re demonstrating is written in Python. We also have another version in Java, but we’re not going to be using it today.

 

So if we go to Python here and again, just a quick note, Python and Java won’t make a big difference as we speak, because we’re talking about the other components. As I said, how to have like just like logical ways of structuring the pipeline, and how to do different kinds of testing in the continuous deployment pipeline, has less to do with the implementation language.

 

So, within the Python module, we have the pipeline module itself.

 

So here we have some scripts to run tests and we have Docker files for the container image that we’re going to be deploying, however, that’s, the main part of the repo for the pipeline and as you see it,’s a Traditional Python module, we have a setup file that is very important when it comes to packaging, the pipeline code to run into Dataflow, we’re going to be passing, passing it around.

 

In a lot of configurations.

 

We have the main function, the main module.

 

The entry point – and we have the pipeline code structured and final modules, like that, like the traditional Python that you can see anywhere So let’s have a look at the main function or the main.

 

The main method here, As you see in the first steps of this pipeline, is that we are parsing several arguments.

 

Three of them.

 

We need the path for the input CSV and we need two names or two specs for BigQuery tables, one to store the results and one to store the errors that we encounter during the processing of the data.

 

So in this example, we also want to highlight some of the usage patterns that you can find in Apache Beam, For example, having multiple outputs or what we call side outputs. I also want to demonstrate how to use side inputs and how to use counters.

 

I’m going to be showing that within the pipeline, so the first actual step of the pipeline is that we’re reading this abbreviation file from the local repository, and basically, what we want to do is to apply some cleaning on the addresses that we Have So we want to map things like str to street rd to the road like this kind of abbreviations in addresses av to avenue? So this kind of thing And we want to read – or we want to have this mapping or dictionary? We’re reading it here into a Python dictionary and we want to make it available for all the processing nodes, all the Dataflow nodes, so that they can access it locally and do element-by-element, transformation, or cleaning.

 

I’m going to show you how we’re using it once we come to this point, You can see that for the pipeline itself, we start the Beam pipeline like that and we’re structuring it into three main components following the ETL abbreviation.

 

So the first one we’re logically grouping all the transformations needed or the steps needed for extracting and parsing the data, only the second step, all the steps needed to clean and calculate and basically to do all the transformations that we have.

 

And finally, we’re loading the data, and the results into the BigQuery table like here, and we’re also doing that for the parsing errors.

 

That happened during the pipeline And one way to explain why we’re doing it in this exact way is that this would facilitate our testing So think.

 

For example, this one, like this P, transform that encapsulates all the core processing logic of the pipeline could be tested in isolation and we can automate this testing using static inputs.

 

I, don’t need to care about reading from CSV, parsing, GCS, or the query.

 

I don’t care about that.

 

I just need to focus on testing the core transformations of the pipeline using static input and, if we’re not doing it this way, it would be hard to do so Like imagine if we have one pipeline that contains all the steps under each other. Like read, do this parallel do or do function apply this transformation and write? It would be hard for us to isolate steps in between and do the proper unit, testing, or integration testing.

 

So let’s have a quick look at what’s happening in the extract transform In this one.

 

I’m going to show you two main patterns with Apache Beam.

 

So, first of all, I think by now you know the difference between a P transform and a do function.

 

A P transform is a logical grouping of a set of other steps like doing functions, reading from files or databases, and so on.

 

So in this one, we’re reading, we’re using Beam io to read from the CSV file that we just have.

 

We’re doing a quick reshuffle just for performance reasons In case of reading big files.

 

We need to shuffle them into smaller components and then we’re applying the parsing logic via a do function.

 

So here we’re applying the Beam ParDo and we have a Do function, ParseCSV.

 

That is doing something a bit interesting here. It’s using this with the outputs method, And basically, what does that mean? It means that this Do function is returning multiple outputs.

 

So this Do function will return like an array of P collections or a dictionary of P collections and records and errors, and then later on in the next step, we can access them or index them separately.

 

So you can see that we’re extracting the records.

 

Only by indexing the records and errors array with this name, the CORRECT OUTPUT TAG and the same thing for the errors.

 

So let’s have a quick look at the Do function itself and how it’s, creating these multiple outputs or like this Beam pattern.

 

The side outputs This one, this Do function, as you see, demonstrates two things.

 

First of all, we want to have some counters, so these counters can help us to base like use metrics in general and one way or one instance of metrics are counters.

 

So in this one, we want to count the number of input checkers.

 

The number of the correct triggers that we could parse we could parse and the number of uh rejected records.

 

All these kinds of counters would be available to you in the data flow UI. Once you deploy the pipeline and it’s running, it’s very useful when you have streaming pipelines, for example, because while the uh job is running, you can see this kind of counters and take actions or what is even better.

 

That these counters, with these metrics, are being written automatically to a stack driver it’s being collected and written to stack driver, where you can automate more custom, dashboards or alerting rules whenever the number of rejected records reaches some limit, like doing something like sending this Pops up message that automates another workflow, whatever you can think about the counter, is here in the process: function itself that is processing element by element.

 

We’re checking.

 

We do some simple checks.

 

We want to ignore the header file and otherwise, we parse the line itself simply by recognizing it based on the separator uh.

 

In case we pass the record correctly.

 

We increment the counter for correct records and, if you see here whenever we’re just processing an element, we’ll start processing an element.

 

We’re increasing.

 

We’re incrementing the counter for input records and whenever we have an output, we increment the counter for rejected multiple outputs.

 

So basically we are returning like yielding two um. Two constructs are called tagged: outputs so think about them.

 

As p collections uh like a named p collection in one way or another, so we’re creating in this one and this in this path, we’re, creating a tag: output of this name, the static name and we’re, including the record uh.

 

The record that we just correctly processed in here and the output for the errors we’re doing another tag.

 

Output like this, with this name and this payload with this record.

 

So basically, in this case, we want to create a new kind of record or like a simple tuple dictionary that has the error message and the row line that caused the failure or this error.

 

So let’s go back to the p, transform itself.

 

That’s, what you see like with outputs returns, multiple uh, tagged, outputs and that’s, how we’re using them, or how that’s, how we’re extracting them and returning them to the main function, and that’s.

 

Why? Here we have two p collisions, so it’s easy with the parsing errors.

 

We directly write it to the query like that, using the option or the configuration that the parameter that we got from the user like the error, uh writing into the errors VQ table.

 

The interesting part happens with the parsed records. So here, you can see that we start with the parse records that become the p collection and we’re applying to it.

 

The second uh p transform is the pre-processing transform, so you can see in the pre-processing transform here it takes the abbreviation dictionary.

 

The one that we read in here and that we do the side input uh like later on, like this, would be passed to a new function and that’s, how we do the side input pattern.

 

So let’s have a quick look again at what’s happening here.

 

So this is another p, transform that encapsulates all the transformation logic or the core transformation logic of the pipeline, and it does two uh two main steps.

 

The first one is that we clean and transform the data that we have while applying the abbreviation mapping that we talked about this and the second one is to calculate the similarity features.

 

Given the parse clean, p collection, or parse clean input that we have, we calculate the similarity features which are the output to the point quickly.

 

I just want to show you here how we do the site inputs.

 

So if you remember this is a do function and in every do function, there is a process method that is processing an element by element.

 

However, if you want to extend that and that’s like pretty straightforward in Python, especially if you want to extend it so that it accepts any other uh input, you can just add it as it is here. So this is the abbreviation it’s a Python dictionary you don’t need to pass it as a p collection or any in any kind of wrapper, just a dictionary, and then the beam SDK would take care of um spreading it or like um.

 

Sending it to the data flow worker nodes so again, as I said, like we’re, not focusing that much on what’s happening in the pipeline.

 

I just want to show you some high-level patterns, and why are we structuring the pipeline in this way, as I said, this is basically to facilitate how we’re doing testing or several layers of this thing.

 

So with that, let’s have a look at the different kinds of testing that we’re talking about, and how are we applying them in Apache in data flow work. So probably have seen this diagram somewhere in the previous sessions or the next sessions, but yeah this laser.

 

So, as you see like, we have different compo different uh layers within the pipeline that we have so we start with the do functions like the let’s say the atomic unit of transformations in Apache beam.

 

We have some other Python, like straightforward Python.

 

Utility functions as well that you want to unit test so for these.

 

We’re doing unit tests and isolation without reading from input data sets or writing just with static input.

 

And then, if we combine some do functions together into a heat transform as we did with the transformation, uh p transform that I just showed you.

 

This could be either a composite transformative test or what we call a transform integration test. So it depends on how many p transforms you’re, putting under the test anyways.

 

The important point here is that for this level of testing and this level of testing, we want to automate them without we want to make them in isolation.

 

We don’t want to read from the input or the output.

 

We just want to use static data and have a static output, compare them, and keep repeating this every time we have a build every time we’re deploying the pipeline to our environment.

 

The last kind of testing is either like system integration test or end to end test like this one, and as you can see, this one will not only test the core transformations or the functions within the pipeline.

 

It also tests out the deployment of the pipeline.

 

So what if we build a data flow template, as we’re going to be doing and in the unit test we’re just unit testing the transformations, but after deploying the pipeline, something goes wrong?

 

Maybe I had a problem with the docker image that I’m using and the data flow job is not triggering so.

 

This kind of system integration test is very important and we can do it in different scopes or different ways.

 

So, for example, we could read sample static input as part of our continuous deployment pipeline. We stage it somewhere like in CVS.

 

You can stage it on the GCS bucket and write the data to some staging or testing big query results and just do a smoke test like.

 

Are we having the expected number of records in the expected tables or not, or we can run it end-to-end full data set and test things like uh performance, metrics, and things like that? So these are the different kinds of levels of testing that we’re talking about, and I want to go back to I wouldn’t show you: how are we doing that within the pipeline? So python we have the testing module, so the first one is very straightforward.

 

Uh, just as an example, we have a utility function, that is, mapping abbreviations, the one that is taking things like str, converted into straight, and so on, and this is forward the Python unit test.

 

There’s nothing about beaming, but we should like it in a production data flow pipeline.

 

We should also be testing this kind of thing.

 

So here we’re having the abbreviations.

 

We are applying this abbreviation.

 

We have the expected uh results and we’re doing an assertion.

 

So basically we’re going to do the same thing, but using some beam apache beam, SDK methods that could be useful for that. So if we go back to tests, let’s start with part cv.

 

If you remember this, one was a do function, so we create a unit test case um in this case, as this one, we’re testing this.

 

This new function is only the do function, and if you remember this is a new function that is reading the input and returning two outputs.

 

I want to show you: how do we test like this kind of two outputs or beside outputs pattern? We create some input-accepted records, so this one is: this line should be parsed by the pipeline.

 

This one should be rejected because it’s missing the city attribute.

 

We combine them to input data like accepted records and rejected records, and we have the same for expected results one for the expected successful results, which is based on a record, not just a line like that, and the expected rejected entry, including the error.

 

The error message that you have and the rejected line itself.

 

How do you do unit testing, then um? The Apache beam SDK contains this test pipeline class that we could use, so we instantiate a pipeline and we start by creating.

 

I think this is like might be the most important part of the unit test, the create function so, instead of reading from external sources like pub-sub GCS big query, whatever that is, you can create a p collection out of some static input.

 

So in this case, we have the static input and we’re, creating it um we’ll be injecting it into the pipeline using the create function and then testing the do function. We’re just applying it in this test pipeline.

 

Like this, the same way we’re using it in the pipeline itself, so we have this part: do we’re applying this new function and it’s also using the with outputs method the same as the pipeline returning two outputs? So we have the first output as the first tagged output.

 

You can access it by its name.

 

Correct output tag collects the value behind this uh.

 

This tag and the same thing for the wrong one and you’re doing the same thing as the Python unit test.

 

We’re doing assertions.

 

We assert that this p collection is equal to the speed collection and we have both of them.

 

So that was how to unit test a do function.

 

Let’s have a look at how to do that for a b, transform, so unit testing a heat transform in the way that we designed or structured the pipeline is, is what we’re there.

 

That’s – what I want to highlight here so again, we have all the transformations or the core pipeline transformations included in one p. Transform of course depends on the pipeline size.

 

If it’s too complicated.

 

You can split into multiple, let’s say.

 

Like major steps, but in this way you can put the core transformation under the test without the need of reading data from external sources or writing it to other external sources.

 

So in this one, we’re creating uh the input data as like an array of records, and in this case, we only have one record.

 

We have the expected output and notice that in the expected output here we have the similarity functions already calculated the address.

 

The similarity is zero because they are completely different like street one and root one, but the similarity between cities, the city, one and city.

 

One is the same, so we’re expecting zero and one.

 

We start the testing pipeline like that in this case since we tested, or just to make it simpler here, we’re not testing the mapping function.

 

We should be doing that separately, but also like in this one. If you apply to map, you can create the abbreviation dictionary here and uh same thing same pattern: we create the p collection, the input, p collection, using the create function with the static data, and then in the pipeline, we, apply the p transform giving.

 

We’re, passing it this abbreviation function and, as we have seen before, like we are doing an assertion that the output p collection, this one is equal to the expected output and that’s it.

 

So this is about basically how to test the do functions and how to test the uh transform integration.

 

Uh, the transfer integration test of the composite transformer test testing, the p transform, like one p, transform or multiple p transforms it’s the same way.

 

What about the system integration test, though, so, for this um? We need to have like some sort of a script because it’s not being executed locally or within the continuous deployment pipeline.

 

It’s been executed externally on the deployed um like on your environment, on GCP, for example.

 

So if we go to the root drop in the root folder, we have this integration test.

 

You can automate it with a bash script with Python.

 

Currently, we’re using bash.

 

We want, but we want to port it to Python, to be more like easy, easily readable for more people and go quickly to the logic of this. The first step is that we’re preparing some GCP test resources, so we’re creating a GCP market for the integration testing for the system.

 

For integration testing, we’re, creating some DQ data, and setting a couple of tables using the schema that we’re.

 

Storing in the big query JSON schema that we’re storing within the repository, we’re creating results, tables, and error statements, and then we second step here would be to invoke or run the flex template or the job template that we already deployed In a previous step, the regular is going to talk about this part.

 

We’re executing it on GCP, passing it the parameters that we just created like the system, and integration test parameters and after it starts running, we need to loop on it like a loop, every 60 seconds we go and check if the job still running.

 

Is it still running is still running and we either have a failed job status and then the integration test fails because the job had an error or it’s successful?

 

If it’s successful, we move to the last part of the system.

 

Integration test, which is checking the results and the error tables on the big queries.

 

So for this, we created this simple, uh query: since we created the input to this uh integration test, we know the content of the file.

 

So in this simple example, we know that we were expecting two records to be inserted in the successful results table and one in the um in the error table, and we’re trying to generate a query that returns only two or fours, based on some binary Flags, so we’re checking if the first table has two fours.

 

If the second table has uh one record true or false, and then we’re using an end in between them, just like uh, just any way to return either. The query has something like the uh, this system, integration checking, whether the query has successfully run or not.

 

If it runs correctly, we terminate the system integration test.

 

Everything is okay, we can move forward if not it would fail, and by turning it will fail.

 

For example, the continuous deployment pipeline that is invoking this uh script every time we’re doing a commit, or every time we’re, deploying to an environment.

 

So with that um we talked about the pipeline structuring.

 

We talked about testing.

 

The last thing that I want to quickly talk about.

 

Is the data flow templates, so we mentioned that at one point when of course we need to deploy this pipeline, uh and they’re like multiple ways of running pipelines and data flow.

 

Let’s explain them very quickly here.

 

So the first one is that, like you, do like um on demand. Uh run command, so you’re a developer.

 

You have your local environment.

 

You have your code.

 

Every time you submit a job to the data flow, uh runner, or the data flow service, you’re doing two steps in one go, so the data flow service would stage your code dependencies in a GCS bucket.

 

It will start reading from it and run your pipeline on data flow.

 

Every time you want to run this job, you have, you need to have access to uh the local environment, the pipeline code, dependencies, and so on.

 

As a developer, I want to deploy the template once or deploy the job once and let other users run it as well.

 

In this case, we need to separate between storing the pipeline and executing the pipeline, so that’s where we use templates we have two different kinds of templates – classic templates and flex templates – and the only difference is how they store the code and dependencies.

 

So for classic templates, whenever the developer initiates at least like template requests like create template request, we create a template file in GCP containing some metadata description about the pipeline, the kind of parameters that are expected and that’s, where you have staged the code and Dependencies and whenever a user is submitting a run request for a template, they are just referencing, it referencing the template file, that is in GCS.

 

The request is excluded by data flow and by clearing dataflow know where to pick the code and dependencies and run your pipeline flex. The templates are the same way, except for how, instead of packaging your uh or staging your code and dependencies on a GCS bucket, you’ll build a container image that is also being referenced by a template file so that if a user wants to run This template they can pass it to data flow and reference the template file in GCS.

 

How do we do quickly so in this? In this example, we are using a flex template or in our wrapper, using a flex template, and to do that, we need to have a docker file.

 

So within Python, we have a docker file.

 

That explains a very simple one that explains how to package the pipeline code that we have so we’re starting the image from this python3 template for data templates, like from the base, we’re copying the pipeline code uh.

 

I think the most important step here is that we are installing all the requirements, all the Python dependencies that we have in the repository we’re, installing it using pip on the image that we have so, for example, this uh text, distance, python library.

 

We have.

 

We’re having it in here and finally, this environment variable python files is the main entry for our pipeline code, so that whenever data flow is invoking this template or this job, it knows where to start the pipeline, and with that, I’ll hand it over To Gregory, who’s going to talk about how to put all these steps together into a continuous deployment pipeline and how to automate the infrastructure components as well? Gregory is all yours now.

 

Thank you very much Karim Hi everyone.

 

My name is Grigory and in this project, I was responsible for the CI CD automation part.

 

So the first step you have to do is to define the steps and there there are orders to be able to define this and pipeline so generally to be able to execute this and pipeline. You have to execute two steps.

 

One is running the unit tests second is running.

 

The integration tests running the unit test are quite easy.

 

We just execute the shell script currently shown to us for running the integration tests.

 

We already have to deploy the template because integration tests would be running against the template in this case, uh, deploy the template.

 

We have to build the docker file.

 

We have to push the uh docker image to the registry and we have to trigger the g cloud CLI command to build the template, the next step.

 

So after that, as a last step, we can execute the shell script, which is running the integration tests.

 

So the next step you have to do is to choose the eco cd uh tool, so it could be Jenkins, it could be git.

 

Labs. Icd generally could be anything.

 

In this case, we decided to use cloud build uh.

 

First of all, it’s a GCP project and second, it is a managed product project which uh saves us from a lot of quite complicated questions like how to organize access to the GCP project, how to store the credentials, how to update the plugins? How to manage the connections and all other problems, which I believe might be a good uh good point for a long talk on their own.

 

So, generally, you would expect the cloud build Yaml, manifest containing the cloud build steps to be in the uh root of the repository in this case, but had two different pipelines, one Java and one Python, and we believe that a Java and Python pipelines could have different Steps for the CIC pipeline, so our cloud build java file for the python pipeline is located in the python folder cloud.

 

So, based on what uh I told regarding the steps which have to be executed uh, you can see those five steps here.

 

The first one is unit testing, so it just invokes the shell script.

 

Uh showed us, the second, is building the container, so it calls the uh docker build command with a batch of parameters.

 

Third, one pushes so pushing the resulting build to the uh GCP repository uh registry uh.

 

Next, execute the uh data flow flex, build flex template build command which will uh, generate the template and put it on a bucket and lastly run the integration test.

 

So another shell script, five steps uh, we believe to make this script generic uh some of the parameters, some of the variables have to be parameterized, so like project id or image, name or image type, some of them uh should be probably set once that’s. Why, in the substitution block they have the default values, while others should be defined on the runtime, let’s say like the image tag?

 

Uh, this parameter has to be probably uh defined on each run.

 

Based on your versioning strategy might be you’re going to be using the uh git commit hash.

 

Maybe you have some type of semantic versioning where you increase your version based on the commit message uh.

 

Nevertheless, uh you can pass these parameters either with the cloud build command or define them in the defaults.

 

How this pipeline can work.

 

So let me demonstrate it.

 

This is a cloud builds submit command uh.

 

So generally, if you execute this command without the parameters, it’s going to search for the cloud uh.

 

Oh sorry, it’s going to search for the cloud-build Yaml file in the root folder. But in this case, as we have a separate cloud, build Yaml file for both uh pipelines, we have to trigger the one from the proper directory so from Python yeah uh.

 

When I give you this cloud build command, it’s, going to trigger the cloud build, and you can see this cloud build process already running in my project, so I have to refer to the page and you see the same pipeline currently running here Is all the steps the whole pipeline will take uh about 12 minutes to run so I won’t wait here until the end of this pipeline and we’ll keep showing you things.

 

We did automate this ci cd pipeline, so I was able to execute this pipeline manually, but of course, you’re not going to do it up in your everyday development life.

 

So probably you would prefer having something like uh ci cd pipeline being triggered on push event.

 

So let me configure the cloud build to run on the push for this.

 

I will use the cloud build trigger.

 

I choose a name for this trigger and I want to execute this pipeline on any pull request.

 

So technically you can have a different uh branching strategy in your git repository in this case uh.

 

I will use the pull request, but you could use something like push the branch or push a new tag.

 

Whatever you choose here, uh, the repository is the one I am storing code at and I want to execute it on any branch uh. The rest is, we could check not relevant uh.

 

The most important thing is the cloud build file.

 

So in this case I need to define the cloud build file again and it’s going to be the file from the button.

 

So now I create a trigger that is listening to an uh pull request.

 

Whenever a pull request is created, it’s going to trigger the pipeline.

 

So currently we have one cloud build running.

 

Let me create a pull request so that you will see that the pipeline going to be triggered the second time.

 

So I switch to my current branch.

 

It’s going to be test feature 2 and I create the pull request.

 

So I’m going to pull it back to my repository and just give it a second yeah. So I pull from the test feature branch 2 to the master branch and then I click Create Google class.

 

So what’s going to happen now? We’re going to get a second cloud, build uh yeah exactly now it’s running so that’s, absolutely the same pipeline now being triggered not manually from the CLI but automatically from the uh cloud, build trigger on each uh pull request.

 

What is interesting here is that I can get back the uh pull uh I can.

 

I can get back in my uh pull request the result of the ci cd process, so you can see the data flow trigger.

 

This one is currently running and it has a yellow sign so, which means that we are still waiting for the result here.

 

Whenever this one going to succeed, which will take another 12 minutes, we will get a green sign here like that, saying that the pipeline is being run and we are sure that whatever changes have been pushed to this branch uh, they don’t break something, and There are ready to be pushed into the master branch uh and technically, if you want, we can also go to the settings and disable uh merge of the branch and to master at least all the requirements here are being fulfilled.

 

So all the checks you need are being drawn, so the whole demonstration here, as I told you, will take another 12 minutes, so maybe we will be able to see the result of it or otherwise.

 

You’ll have to trust me that it works.

 

So meanwhile, I will walk you through another interesting topic, so this was a ci cd pipeline for the uh data flow pipeline itself, but generally, each data flow pipeline would need additional pieces of GCP infrastructure like buckets like uh big query tables, maybe something different.

 

Those things generally should be also automated. Why? Because you, whenever you push changes to your code, which uh relies on some uh parts of this infrastructure already existing uh, you should have a guarantee that those things are already there.

 

So the current industry standard for the uh infrastructures code is terraformed in this case.

 

In the repository, if you switch here in the repository, we have a telephone folder describing the big query tables which are used up by our dataflow pipeline.

 

This thing is here, so just a second, there yeah, those tables are being defined here in the schema folder.

 

These are JSON files containing the schema for the data flow up for the big query tables.

 

Now, what if you want to automate the creation of those tables, or maybe even support it with a ci cd pipeline? To do that? We added another cloud build generally.

 

We believe that the infrastructure code should be separated from the data flow pipeline code and would be residing in a different repository.

 

But for the sake of simplicity of this example, they are now in the same delegate in the same repository, but to keep them logically separated.

 

We use two different cloud-build Yaml files.

 

The terraform ci cd by play is also quite an interesting topic on its own. So in this implementation use the simplest approach just run terraform we need to plan and apply it generally.

 

You would have way more complicated strategies here, based on the branches.

 

Maybe you’ll produce plants store them somewhere on the buckets, compare them, and so so just see it as the simplest possible implementation to show the general approach to how this will be handled.

 

Is this cloud built, so three steps first is terraforming it which is just calling the unique command.

 

The second is a plan and the third one is applied.

 

We also pass the variables here so in this case uh.

 

The project id is not being passed explicitly, but Cloud build can fetch the project for where it is running.

 

So we don’t need to have we don’t have to tell what our project is uh cloud, uh cloud run.

 

We call to build you decided based on on the project now, if I will execute this pipeline from the client, how I did it with the previous uh, how I did it with the previous and pipeline, give it a second and cancel the running one and run another One we will see two additional tables being deployed to our GCP project, so let me show you how it looks now this is the big query and currently in the uh, in the project test, one I don’t have any tables.

 

Now, if I execute this pipeline – and it will take about 30 seconds to run – you will see two additional tables being created in this data set currently running terraform. We need the next step, going to terraform plan, and you should already see the execution plan.

 

So it highlights that several tables going to be created and now it switched to Terraform apply and the pipeline is finished.

 

So if I update the page, I will be able to see two tables being created exactly here so uh.

 

This is the telephone production ready tab.

 

Uh, data set and ml pre-process errors and email prep results, so you can see it from the repository here.

 

If I click on the schema folder, it’s, ml, pre, prog errors, and the mail prep results.

 

If I add the additional file here uh, I will get an additional table uh in the uh data flow.

 

Oh sorry, in the big query uh technically I can also automate this and the pipeline is the trigger.

 

So if I go and let’s say create a trigger and say that it’s going to be a data flow of data for it, and I’m going to be executing about my pipeline this time on every push, the branch and the repository Going to be the same and the branch is going to be the same uh it’s going to be any branch, and I need a cloud build file and in this case, I’m going to be using the cloud build the file and that’s all so in this case, this trigger going to execute this sd pipeline on each uh again on each push to the repository uh, and I can show you this example from here.

 

If I go to the schema repository and create an additional schema file here and let me copy the content from uh any existing file, let’s say from this one, and type here and then all I need to do is to save it. Let me call it just a test and I save it so what I will have to do now is push the changes, so I don’t need stubbles.

 

I see the changes in my file test.

 

Okay, so also give – and I do get push and give me a second to copy the credentials, and now we should see the uh xd pipeline being executed from the history tab, and here we go.

 

This one is uh nope, just give it another.

 

Second, nope still not.

 

Let me quickly take a look here, so I pushed it to the test feature branch, and this is the curse of the real-time examples.

 

Whenever you try to execute it, something goes wrong.

 

Let me quickly check it from here and let me quickly check it from the trigger, so I run it from the feature test branch test feature to this one yeah and now I see my uh my telephone city pipeline running.

 

It will take another 30 seconds and we should be able to see an additional table being created in the big query.

 

So currently it’s a plan step running and you’re going to make it here. So I see the test table so in this case it created a test table.

 

Based on the update, I proved back to the repository which automates, the uh, the infrastructure creation, and all I need to care about is uh having my terraform files in the proper place in the repository and pushing the updates to the telephone file.

 

Whenever I need an additional part for this and pipeline uh, sorry for the data flow pipeline, so with that I believe uh.

 

There was a story about the ci cd automation uh for this data flow project.

 

As found on YouTube

Traffic Xtractor ᶜˡⁱᶜᵏ ᵗʰᵉ ˢⁿᵒʷᵐᵃⁿ ☃ Page 1 Of Google & YouTube In MINUTES! Software Gets As Much FREE Traffic As You Want With A FEW CLICKS OF YOUR MOUSE… NEW Features Include: Video Title & Description Curating & Optimization Google suggest keywords ⇝ Google related keywords ⇝ Bing suggest keyword ⇝ Bing related keywords

Leave a Reply

Your email address will not be published. Required fields are marked *