apache flink – PyFlink – Scala UDF – How to convert Scala Map in Table API?

apache flink – PyFlink – Scala UDF – How to convert Scala Map in Table API?

Original answer from Wei Zhong.
Im just reporter. Thanks Wei !

At this point (Flink 1.11), two methods are working:

  • Current: DataTypeHint in UDF definition + SQL for UDF registering
  • Outdated: override getResultType in UDF definition + t_env.register_java_function for UDF registering

Code

Scala UDF

package com.dummy

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

class dummyMap extends ScalarFunction {

  // If the udf would be registered by the SQL statement, you need add this typehint
  @DataTypeHint(ROW<s STRING,t STRING>)
  def eval(): Row = {

    Row.of(java.lang.String.valueOf(foo), java.lang.String.valueOf(bar))

  }

  // If the udf would be registered by the method register_java_function, you need override this
  // method.
  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
    // The type of the return values should be TypeInformation
    Types.ROW(Array(s, t), Array[TypeInformation[_]](Types.STRING(), Types.STRING()))
  }
}

Python code

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)

# load the scala udf jar file, the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string(pipeline.jars, file:///Users/zhongwei/the-dummy-udf.jar)

# register the udf via 
st_env.execute_sql(CREATE FUNCTION dummyMap AS com.dummy.dummyMap LANGUAGE SCALA)
# or register via the method
# st_env.register_java_function(dummyMap, com.dummy.dummyMap)

# prepare source and sink
t = st_env.from_elements([(1, hi, hello), (2, hi, hello)], [a, b, c])
st_env.execute_sql(create table mySink (
        output_of_my_scala_udf ROW<s STRING,t STRING>
    ) with (
        connector = print
    ))

# execute query
t.select(dummyMap()).execute_insert(mySink).get_job_client().get_job_execution_result().result()

apache flink – PyFlink – Scala UDF – How to convert Scala Map in Table API?

Leave a Reply

Your email address will not be published.