Import from Elasticsearch(Python)

Loading...

Elasticsearch

  1. Launch a cluster in your workspace or choose an existing cluster.
  2. Once the new cluster is running, go to the "Libraries" tab of that cluster, and click "Install new" -> choose "Maven" -> enter the maven coordinates org.elasticsearch:elasticsearch-spark-30_2.12:8.4.3 -> click "Install". If running into errors like org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version while the ES connection is verified, consider install newer versions that matches your ES service.
  3. Once the installation has finished, attach this notebook to the cluster, and run write and/or read operations against your Elasticsearch cluster

Important: In the following cells, replace <ip-address>, <port>, <ssl>, <hostname> and <index> with your Elasticsearch configuration

%sh 
nc -vz ip-address port
people = spark.createDataFrame( [ ("Bilbo",     50), 
                                  ("Gandalf", 1000), 
                                  ("Thorin",   195),  
                                  ("Balin",    178), 
                                  ("Kili",      77),
                                  ("Dwalin",   169), 
                                  ("Oin",      167), 
                                  ("Gloin",    158), 
                                  ("Fili",      82), 
                                  ("Bombur",  None)
                                ], 
                                ["name", "age"] 
                              )
# Overwrite the data each time

# NOTE: We **must** set the es.nodes.wan.only property to 'true' so that the connector will connect to the node(s) specified by the `es.nodes` parameter.
#       Without this setting, the ES connector will try to discover ES nodes on the network using a broadcast ping, which won't work.
#       We want to connect to the node(s) specified in `es.nodes`.
( people.write
  .format( "org.elasticsearch.spark.sql" )
  .option( "es.nodes",   hostname )
  .option( "es.port",    port     )
  .option( "es.net.ssl", ssl      )
  .option( "es.nodes.wan.only", "true" )
  .mode( "overwrite" )
  .save( f"{index}" )
)
# NOTE: We **must** set the es.nodes.wan.only property to 'true' so that the connector will connect to the node(s) specified by the `es.nodes` parameter.
#       Without this setting, the ES connector will try to discover ES nodes on the network using a broadcast ping, which won't work.
#       We want to connect to the node(s) specified in `es.nodes`.
df = (spark.read
      .format( "org.elasticsearch.spark.sql" )
      .option( "es.nodes",   hostname )
      .option( "es.port",    port     )
      .option( "es.net.ssl", ssl      )
      .option( "es.nodes.wan.only", "true" )
      .load( f"{index}" )
     )

display(df)
# Creates a Delta table called table_name
df.write.format("delta").saveAsTable(table_name)