Creating JDBC source table schema(DDL) dynamically

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Creating JDBC source table schema(DDL) dynamically

Kadam, Gangadhar (GE Aviation, Non-GE)
Hi All,

I am trying to build a spark application which will  read the data from Postgresql (source)  one environment  and write it to  postgreSQL, Aurora (target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) using spark JDBC.

When I am loading the dataframe back to target DB, I would like to ensure the same schema as the source table schema using

val targetTableSchema: String =
  """
    |  operating_unit_nm character varying(20),
    |  organization_id integer,
    |  organization_cd character varying(30),
    |  requesting_organization_id integer,
    |  requesting_organization_cd character varying(50),
    |  owning_organization_id integer,
    |  owning_organization_cd character varying(50)
""".stripMargin


.option("createTableColumnTypes", targetTableSchema )

I would like to know if there is way I can create this targetTableSchema (source table DDL) variable directly from source table or from a csv file. I don’t want spark to enforce its default schema.  Based on the table name, How do I  get the DDL created dynamically to pass it to targetTableSchema variable as a string.

Currently I am updating targetTableSchema manually  and looking for some pointer to automate it.


Below is my code

// Define the parameter
val sourceDb: String = args(0)
val targetDb: String = args(1)
val sourceTable: String = args(2)
val targetTable: String = args(3)
val sourceEnv: String = args(4)
val targetEnv: String = args(5)

println("Arguments Provided: " + sourceDb, targetDb,sourceTable, targetTable, sourceEnv, targetEnv)

// Define the spark session
val spark: SparkSession = SparkSession
  .builder()
  .appName("Ca-Data-Transporter")
  .master("local")
  .config("driver", "org.postgresql.Driver")
  .getOrCreate()

// define the input directory
val inputDir: String = "/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"

// Define the source DB properties
val sourceParmFile: String = if (sourceDb == "RDS") {
    "rds-db-parms-" + sourceEnv + ".txt"
  }
  else if (sourceDb == "AURORA") {
    "aws-db-parms-" + sourceEnv + ".txt"
  }
  else if (sourceDb == "GP") {
    "gp-db-parms-" + sourceEnv + ".txt"
  }
  else "NA"

println(sourceParmFile)

val sourceDbParms: Properties = new Properties()
sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")

println(s"$sourceDb")
println(s"$sourceDbJdbcUrl")

// Define the target DB properties
val targetParmFile: String = if (targetDb == "RDS") {
    s"rds-db-parms-" + targetEnv + ".txt"
  }
  else if (targetDb == "AURORA") {
    s"aws-db-parms-" + targetEnv + ".txt"
  }
  else if (targetDb == "GP") {
    s"gp-db-parms-" + targetEnv + ".txt"
  } else "aws-db-parms-$targetEnv.txt"

println(targetParmFile)

val targetDbParms: Properties = new Properties()
targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")

println(s"$targetDb")
println(s"$targetDbJdbcUrl")

// Read the source table as dataFrame
val sourceDF: DataFrame = spark
  .read
  .jdbc(url = sourceDbJdbcUrl,
    table = sourceTable,
    sourceDbParms
  )
  //.filter("site_code is not null")

sourceDF.printSchema()
sourceDF.show()

val sourceDF1 = sourceDF.repartition(
  sourceDF("organization_id")
  //sourceDF("plan_id")
)


val targetTableSchema: String =
  """
    |  operating_unit_nm character varying(20),
    |  organization_id integer,
    |  organization_cd character varying(30),
    |  requesting_organization_id integer,
    |  requesting_organization_cd character varying(50),
    |  owning_organization_id integer,
    |  owning_organization_cd character varying(50)
  """.stripMargin


// write the dataFrame
sourceDF1
  .write
  .option("createTableColumnTypes", targetTableSchema )
  .mode(saveMode = "Overwrite")
  .option("truncate", "true")
  .jdbc(targetDbJdbcUrl, targetTable, targetDbParms)


Thanks!
Gangadhar Kadam
Sr. Data Engineer
M + 1 (401) 588 2269

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Creating JDBC source table schema(DDL) dynamically

Thakrar, Jayesh
One option is to use plain JDBC to interrogate Postgresql catalog for the source table and generate the DDL to create the destination table.
Then using plain JDBC again, create the table at the destination.

See the link below for some pointers…..

https://stackoverflow.com/questions/2593803/how-to-generate-the-create-table-sql-statement-for-an-existing-table-in-postgr


On 7/11/18, 9:55 PM, "Kadam, Gangadhar (GE Aviation, Non-GE)" <[hidden email]> wrote:

    Hi All,
   
    I am trying to build a spark application which will  read the data from Postgresql (source)  one environment  and write it to  postgreSQL, Aurora (target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) using spark JDBC.
   
    When I am loading the dataframe back to target DB, I would like to ensure the same schema as the source table schema using
   
    val targetTableSchema: String =
      """
        |  operating_unit_nm character varying(20),
        |  organization_id integer,
        |  organization_cd character varying(30),
        |  requesting_organization_id integer,
        |  requesting_organization_cd character varying(50),
        |  owning_organization_id integer,
        |  owning_organization_cd character varying(50)
    """.stripMargin
   
   
    .option("createTableColumnTypes", targetTableSchema )
   
    I would like to know if there is way I can create this targetTableSchema (source table DDL) variable directly from source table or from a csv file. I don’t want spark to enforce its default schema.  Based on the table name, How do I  get the DDL created dynamically to pass it to targetTableSchema variable as a string.
   
    Currently I am updating targetTableSchema manually  and looking for some pointer to automate it.
   
   
    Below is my code
   
    // Define the parameter
    val sourceDb: String = args(0)
    val targetDb: String = args(1)
    val sourceTable: String = args(2)
    val targetTable: String = args(3)
    val sourceEnv: String = args(4)
    val targetEnv: String = args(5)
   
    println("Arguments Provided: " + sourceDb, targetDb,sourceTable, targetTable, sourceEnv, targetEnv)
   
    // Define the spark session
    val spark: SparkSession = SparkSession
      .builder()
      .appName("Ca-Data-Transporter")
      .master("local")
      .config("driver", "org.postgresql.Driver")
      .getOrCreate()
   
    // define the input directory
    val inputDir: String = "/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"
   
    // Define the source DB properties
    val sourceParmFile: String = if (sourceDb == "RDS") {
        "rds-db-parms-" + sourceEnv + ".txt"
      }
      else if (sourceDb == "AURORA") {
        "aws-db-parms-" + sourceEnv + ".txt"
      }
      else if (sourceDb == "GP") {
        "gp-db-parms-" + sourceEnv + ".txt"
      }
      else "NA"
   
    println(sourceParmFile)
   
    val sourceDbParms: Properties = new Properties()
    sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
    val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")
   
    println(s"$sourceDb")
    println(s"$sourceDbJdbcUrl")
   
    // Define the target DB properties
    val targetParmFile: String = if (targetDb == "RDS") {
        s"rds-db-parms-" + targetEnv + ".txt"
      }
      else if (targetDb == "AURORA") {
        s"aws-db-parms-" + targetEnv + ".txt"
      }
      else if (targetDb == "GP") {
        s"gp-db-parms-" + targetEnv + ".txt"
      } else "aws-db-parms-$targetEnv.txt"
   
    println(targetParmFile)
   
    val targetDbParms: Properties = new Properties()
    targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
    val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")
   
    println(s"$targetDb")
    println(s"$targetDbJdbcUrl")
   
    // Read the source table as dataFrame
    val sourceDF: DataFrame = spark
      .read
      .jdbc(url = sourceDbJdbcUrl,
        table = sourceTable,
        sourceDbParms
      )
      //.filter("site_code is not null")
   
    sourceDF.printSchema()
    sourceDF.show()
   
    val sourceDF1 = sourceDF.repartition(
      sourceDF("organization_id")
      //sourceDF("plan_id")
    )
   
   
    val targetTableSchema: String =
      """
        |  operating_unit_nm character varying(20),
        |  organization_id integer,
        |  organization_cd character varying(30),
        |  requesting_organization_id integer,
        |  requesting_organization_cd character varying(50),
        |  owning_organization_id integer,
        |  owning_organization_cd character varying(50)
      """.stripMargin
   
   
    // write the dataFrame
    sourceDF1
      .write
      .option("createTableColumnTypes", targetTableSchema )
      .mode(saveMode = "Overwrite")
      .option("truncate", "true")
      .jdbc(targetDbJdbcUrl, targetTable, targetDbParms)
   
   
    Thanks!
    Gangadhar Kadam
    Sr. Data Engineer
    M + 1 (401) 588 2269
   


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Creating JDBC source table schema(DDL) dynamically

Kadam, Gangadhar (GE Aviation, Non-GE)
Thanks Jayesh.

I was aware of the catalog table approach but I was avoiding that  because I will hit the database twice for one table, one to create DDL and other to read the data. I have lots of table to transport from one environment to other and I don’t want to create unnecessary load on the DB.


On 7/12/18, 10:09 AM, "Thakrar, Jayesh" <[hidden email]> wrote:

    One option is to use plain JDBC to interrogate Postgresql catalog for the source table and generate the DDL to create the destination table.
    Then using plain JDBC again, create the table at the destination.
   
    See the link below for some pointers…..
   
    https://stackoverflow.com/questions/2593803/how-to-generate-the-create-table-sql-statement-for-an-existing-table-in-postgr
   
   
    On 7/11/18, 9:55 PM, "Kadam, Gangadhar (GE Aviation, Non-GE)" <[hidden email]> wrote:
   
        Hi All,
       
        I am trying to build a spark application which will  read the data from Postgresql (source)  one environment  and write it to  postgreSQL, Aurora (target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) using spark JDBC.
       
        When I am loading the dataframe back to target DB, I would like to ensure the same schema as the source table schema using
       
        val targetTableSchema: String =
          """
            |  operating_unit_nm character varying(20),
            |  organization_id integer,
            |  organization_cd character varying(30),
            |  requesting_organization_id integer,
            |  requesting_organization_cd character varying(50),
            |  owning_organization_id integer,
            |  owning_organization_cd character varying(50)
        """.stripMargin
       
       
        .option("createTableColumnTypes", targetTableSchema )
       
        I would like to know if there is way I can create this targetTableSchema (source table DDL) variable directly from source table or from a csv file. I don’t want spark to enforce its default schema.  Based on the table name, How do I  get the DDL created dynamically to pass it to targetTableSchema variable as a string.
       
        Currently I am updating targetTableSchema manually  and looking for some pointer to automate it.
       
       
        Below is my code
       
        // Define the parameter
        val sourceDb: String = args(0)
        val targetDb: String = args(1)
        val sourceTable: String = args(2)
        val targetTable: String = args(3)
        val sourceEnv: String = args(4)
        val targetEnv: String = args(5)
       
        println("Arguments Provided: " + sourceDb, targetDb,sourceTable, targetTable, sourceEnv, targetEnv)
       
        // Define the spark session
        val spark: SparkSession = SparkSession
          .builder()
          .appName("Ca-Data-Transporter")
          .master("local")
          .config("driver", "org.postgresql.Driver")
          .getOrCreate()
       
        // define the input directory
        val inputDir: String = "/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"
       
        // Define the source DB properties
        val sourceParmFile: String = if (sourceDb == "RDS") {
            "rds-db-parms-" + sourceEnv + ".txt"
          }
          else if (sourceDb == "AURORA") {
            "aws-db-parms-" + sourceEnv + ".txt"
          }
          else if (sourceDb == "GP") {
            "gp-db-parms-" + sourceEnv + ".txt"
          }
          else "NA"
       
        println(sourceParmFile)
       
        val sourceDbParms: Properties = new Properties()
        sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
        val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")
       
        println(s"$sourceDb")
        println(s"$sourceDbJdbcUrl")
       
        // Define the target DB properties
        val targetParmFile: String = if (targetDb == "RDS") {
            s"rds-db-parms-" + targetEnv + ".txt"
          }
          else if (targetDb == "AURORA") {
            s"aws-db-parms-" + targetEnv + ".txt"
          }
          else if (targetDb == "GP") {
            s"gp-db-parms-" + targetEnv + ".txt"
          } else "aws-db-parms-$targetEnv.txt"
       
        println(targetParmFile)
       
        val targetDbParms: Properties = new Properties()
        targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
        val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")
       
        println(s"$targetDb")
        println(s"$targetDbJdbcUrl")
       
        // Read the source table as dataFrame
        val sourceDF: DataFrame = spark
          .read
          .jdbc(url = sourceDbJdbcUrl,
            table = sourceTable,
            sourceDbParms
          )
          //.filter("site_code is not null")
       
        sourceDF.printSchema()
        sourceDF.show()
       
        val sourceDF1 = sourceDF.repartition(
          sourceDF("organization_id")
          //sourceDF("plan_id")
        )
       
       
        val targetTableSchema: String =
          """
            |  operating_unit_nm character varying(20),
            |  organization_id integer,
            |  organization_cd character varying(30),
            |  requesting_organization_id integer,
            |  requesting_organization_cd character varying(50),
            |  owning_organization_id integer,
            |  owning_organization_cd character varying(50)
          """.stripMargin
       
       
        // write the dataFrame
        sourceDF1
          .write
          .option("createTableColumnTypes", targetTableSchema )
          .mode(saveMode = "Overwrite")
          .option("truncate", "true")
          .jdbc(targetDbJdbcUrl, targetTable, targetDbParms)
       
       
        Thanks!
        Gangadhar Kadam
        Sr. Data Engineer
        M + 1 (401) 588 2269
       
   
   
    ---------------------------------------------------------------------
    To unsubscribe e-mail: [hidden email]
   
   


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Creating JDBC source table schema(DDL) dynamically

Thakrar, Jayesh
Unless the tables are very small (< 1000 rows), the impact of hitting the catalog tables is negligible.
Furthermore, normally the catalog tables (or views) are usually in memory because they are needed for query compilation, query execution (for triggers, referential integrity, etc) and even to establish a connection.

On 7/12/18, 9:53 AM, "Kadam, Gangadhar (GE Aviation, Non-GE)" <[hidden email]> wrote:

    Thanks Jayesh.
   
    I was aware of the catalog table approach but I was avoiding that  because I will hit the database twice for one table, one to create DDL and other to read the data. I have lots of table to transport from one environment to other and I don’t want to create unnecessary load on the DB.
   
   
    On 7/12/18, 10:09 AM, "Thakrar, Jayesh" <[hidden email]> wrote:
   
        One option is to use plain JDBC to interrogate Postgresql catalog for the source table and generate the DDL to create the destination table.
        Then using plain JDBC again, create the table at the destination.
       
        See the link below for some pointers…..
       
        https://stackoverflow.com/questions/2593803/how-to-generate-the-create-table-sql-statement-for-an-existing-table-in-postgr
       
       
        On 7/11/18, 9:55 PM, "Kadam, Gangadhar (GE Aviation, Non-GE)" <[hidden email]> wrote:
       
            Hi All,
           
            I am trying to build a spark application which will  read the data from Postgresql (source)  one environment  and write it to  postgreSQL, Aurora (target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) using spark JDBC.
           
            When I am loading the dataframe back to target DB, I would like to ensure the same schema as the source table schema using
           
            val targetTableSchema: String =
              """
                |  operating_unit_nm character varying(20),
                |  organization_id integer,
                |  organization_cd character varying(30),
                |  requesting_organization_id integer,
                |  requesting_organization_cd character varying(50),
                |  owning_organization_id integer,
                |  owning_organization_cd character varying(50)
            """.stripMargin
           
           
            .option("createTableColumnTypes", targetTableSchema )
           
            I would like to know if there is way I can create this targetTableSchema (source table DDL) variable directly from source table or from a csv file. I don’t want spark to enforce its default schema.  Based on the table name, How do I  get the DDL created dynamically to pass it to targetTableSchema variable as a string.
           
            Currently I am updating targetTableSchema manually  and looking for some pointer to automate it.
           
           
            Below is my code
           
            // Define the parameter
            val sourceDb: String = args(0)
            val targetDb: String = args(1)
            val sourceTable: String = args(2)
            val targetTable: String = args(3)
            val sourceEnv: String = args(4)
            val targetEnv: String = args(5)
           
            println("Arguments Provided: " + sourceDb, targetDb,sourceTable, targetTable, sourceEnv, targetEnv)
           
            // Define the spark session
            val spark: SparkSession = SparkSession
              .builder()
              .appName("Ca-Data-Transporter")
              .master("local")
              .config("driver", "org.postgresql.Driver")
              .getOrCreate()
           
            // define the input directory
            val inputDir: String = "/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"
           
            // Define the source DB properties
            val sourceParmFile: String = if (sourceDb == "RDS") {
                "rds-db-parms-" + sourceEnv + ".txt"
              }
              else if (sourceDb == "AURORA") {
                "aws-db-parms-" + sourceEnv + ".txt"
              }
              else if (sourceDb == "GP") {
                "gp-db-parms-" + sourceEnv + ".txt"
              }
              else "NA"
           
            println(sourceParmFile)
           
            val sourceDbParms: Properties = new Properties()
            sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
            val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")
           
            println(s"$sourceDb")
            println(s"$sourceDbJdbcUrl")
           
            // Define the target DB properties
            val targetParmFile: String = if (targetDb == "RDS") {
                s"rds-db-parms-" + targetEnv + ".txt"
              }
              else if (targetDb == "AURORA") {
                s"aws-db-parms-" + targetEnv + ".txt"
              }
              else if (targetDb == "GP") {
                s"gp-db-parms-" + targetEnv + ".txt"
              } else "aws-db-parms-$targetEnv.txt"
           
            println(targetParmFile)
           
            val targetDbParms: Properties = new Properties()
            targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
            val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")
           
            println(s"$targetDb")
            println(s"$targetDbJdbcUrl")
           
            // Read the source table as dataFrame
            val sourceDF: DataFrame = spark
              .read
              .jdbc(url = sourceDbJdbcUrl,
                table = sourceTable,
                sourceDbParms
              )
              //.filter("site_code is not null")
           
            sourceDF.printSchema()
            sourceDF.show()
           
            val sourceDF1 = sourceDF.repartition(
              sourceDF("organization_id")
              //sourceDF("plan_id")
            )
           
           
            val targetTableSchema: String =
              """
                |  operating_unit_nm character varying(20),
                |  organization_id integer,
                |  organization_cd character varying(30),
                |  requesting_organization_id integer,
                |  requesting_organization_cd character varying(50),
                |  owning_organization_id integer,
                |  owning_organization_cd character varying(50)
              """.stripMargin
           
           
            // write the dataFrame
            sourceDF1
              .write
              .option("createTableColumnTypes", targetTableSchema )
              .mode(saveMode = "Overwrite")
              .option("truncate", "true")
              .jdbc(targetDbJdbcUrl, targetTable, targetDbParms)
           
           
            Thanks!
            Gangadhar Kadam
            Sr. Data Engineer
            M + 1 (401) 588 2269
           
       
       
        ---------------------------------------------------------------------
        To unsubscribe e-mail: [hidden email]
       
       
   
   


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Creating JDBC source table schema(DDL) dynamically

Kadam, Gangadhar (GE Aviation, Non-GE)
Ok. Thanks.

On 7/12/18, 11:12 AM, "Thakrar, Jayesh" <[hidden email]> wrote:

    Unless the tables are very small (< 1000 rows), the impact of hitting the catalog tables is negligible.
    Furthermore, normally the catalog tables (or views) are usually in memory because they are needed for query compilation, query execution (for triggers, referential integrity, etc) and even to establish a connection.
   
    On 7/12/18, 9:53 AM, "Kadam, Gangadhar (GE Aviation, Non-GE)" <[hidden email]> wrote:
   
        Thanks Jayesh.
       
        I was aware of the catalog table approach but I was avoiding that  because I will hit the database twice for one table, one to create DDL and other to read the data. I have lots of table to transport from one environment to other and I don’t want to create unnecessary load on the DB.
       
       
        On 7/12/18, 10:09 AM, "Thakrar, Jayesh" <[hidden email]> wrote:
       
            One option is to use plain JDBC to interrogate Postgresql catalog for the source table and generate the DDL to create the destination table.
            Then using plain JDBC again, create the table at the destination.
           
            See the link below for some pointers…..
           
            https://stackoverflow.com/questions/2593803/how-to-generate-the-create-table-sql-statement-for-an-existing-table-in-postgr
           
           
            On 7/11/18, 9:55 PM, "Kadam, Gangadhar (GE Aviation, Non-GE)" <[hidden email]> wrote:
           
                Hi All,
               
                I am trying to build a spark application which will  read the data from Postgresql (source)  one environment  and write it to  postgreSQL, Aurora (target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) using spark JDBC.
               
                When I am loading the dataframe back to target DB, I would like to ensure the same schema as the source table schema using
               
                val targetTableSchema: String =
                  """
                    |  operating_unit_nm character varying(20),
                    |  organization_id integer,
                    |  organization_cd character varying(30),
                    |  requesting_organization_id integer,
                    |  requesting_organization_cd character varying(50),
                    |  owning_organization_id integer,
                    |  owning_organization_cd character varying(50)
                """.stripMargin
               
               
                .option("createTableColumnTypes", targetTableSchema )
               
                I would like to know if there is way I can create this targetTableSchema (source table DDL) variable directly from source table or from a csv file. I don’t want spark to enforce its default schema.  Based on the table name, How do I  get the DDL created dynamically to pass it to targetTableSchema variable as a string.
               
                Currently I am updating targetTableSchema manually  and looking for some pointer to automate it.
               
               
                Below is my code
               
                // Define the parameter
                val sourceDb: String = args(0)
                val targetDb: String = args(1)
                val sourceTable: String = args(2)
                val targetTable: String = args(3)
                val sourceEnv: String = args(4)
                val targetEnv: String = args(5)
               
                println("Arguments Provided: " + sourceDb, targetDb,sourceTable, targetTable, sourceEnv, targetEnv)
               
                // Define the spark session
                val spark: SparkSession = SparkSession
                  .builder()
                  .appName("Ca-Data-Transporter")
                  .master("local")
                  .config("driver", "org.postgresql.Driver")
                  .getOrCreate()
               
                // define the input directory
                val inputDir: String = "/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"
               
                // Define the source DB properties
                val sourceParmFile: String = if (sourceDb == "RDS") {
                    "rds-db-parms-" + sourceEnv + ".txt"
                  }
                  else if (sourceDb == "AURORA") {
                    "aws-db-parms-" + sourceEnv + ".txt"
                  }
                  else if (sourceDb == "GP") {
                    "gp-db-parms-" + sourceEnv + ".txt"
                  }
                  else "NA"
               
                println(sourceParmFile)
               
                val sourceDbParms: Properties = new Properties()
                sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
                val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")
               
                println(s"$sourceDb")
                println(s"$sourceDbJdbcUrl")
               
                // Define the target DB properties
                val targetParmFile: String = if (targetDb == "RDS") {
                    s"rds-db-parms-" + targetEnv + ".txt"
                  }
                  else if (targetDb == "AURORA") {
                    s"aws-db-parms-" + targetEnv + ".txt"
                  }
                  else if (targetDb == "GP") {
                    s"gp-db-parms-" + targetEnv + ".txt"
                  } else "aws-db-parms-$targetEnv.txt"
               
                println(targetParmFile)
               
                val targetDbParms: Properties = new Properties()
                targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
                val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")
               
                println(s"$targetDb")
                println(s"$targetDbJdbcUrl")
               
                // Read the source table as dataFrame
                val sourceDF: DataFrame = spark
                  .read
                  .jdbc(url = sourceDbJdbcUrl,
                    table = sourceTable,
                    sourceDbParms
                  )
                  //.filter("site_code is not null")
               
                sourceDF.printSchema()
                sourceDF.show()
               
                val sourceDF1 = sourceDF.repartition(
                  sourceDF("organization_id")
                  //sourceDF("plan_id")
                )
               
               
                val targetTableSchema: String =
                  """
                    |  operating_unit_nm character varying(20),
                    |  organization_id integer,
                    |  organization_cd character varying(30),
                    |  requesting_organization_id integer,
                    |  requesting_organization_cd character varying(50),
                    |  owning_organization_id integer,
                    |  owning_organization_cd character varying(50)
                  """.stripMargin
               
               
                // write the dataFrame
                sourceDF1
                  .write
                  .option("createTableColumnTypes", targetTableSchema )
                  .mode(saveMode = "Overwrite")
                  .option("truncate", "true")
                  .jdbc(targetDbJdbcUrl, targetTable, targetDbParms)
               
               
                Thanks!
                Gangadhar Kadam
                Sr. Data Engineer
                M + 1 (401) 588 2269
               
           
           
            ---------------------------------------------------------------------
            To unsubscribe e-mail: [hidden email]
           
           
       
       
   
   


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]