Skip to content

Commit

Permalink
Expanding the livy test to manipulate the metadata fields and submit …
Browse files Browse the repository at this point in the history
…the pyspark job with livy up and running verification script. (#1131)
  • Loading branch information
prince-cs committed Feb 16, 2024
1 parent f7b42b0 commit 654e791
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
8 changes: 8 additions & 0 deletions livy/livy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ function main() {

wget -nv --timeout=30 --tries=5 --retry-connrefused "${LIVY_URL}" -P "${temp}"

local OS
OS=$(. /etc/os-release && echo "${ID}")
if [[ "${OS}" == "rocky" ]]; then
yum install -y unzip
else
apt-get install -y unzip
fi

unzip -q "${temp}/${LIVY_PKG_NAME}.zip" -d /usr/local/lib/
ln -s "/usr/local/lib/${LIVY_PKG_NAME}" "${LIVY_DIR}"

Expand Down
35 changes: 35 additions & 0 deletions livy/test_livy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ class LivyTestCase(DataprocTestCase):
COMPONENT = 'livy'
INIT_ACTIONS = ['livy/livy.sh']
TEST_SCRIPT_FILE_NAME = 'verify_livy_running.py'
DEFAULT_LIVY_VERSION = '0.7.1'
DEFAULT_SCALA_VERSION = '2.11'
LIVY_VERSION = '0.8.0'
SCALA_VERSION = '2.12'
PYTHON2_VERSION = 'python2.7'
PYTHON3_VERSION = 'python3'

def _verify_instance(self, name):
self.upload_test_file(
Expand All @@ -28,6 +34,19 @@ def _run_python_test_file(self, name):
self.assert_instance_command(
name, "sudo python3 {}".format(self.TEST_SCRIPT_FILE_NAME))

def __submit_pyspark_job(self, cluster_name):
if self.getImageVersion() >= pkg_resources.parse_version("2.1"):
python_version = self.PYTHON3_VERSION
else:
python_version = self.PYTHON2_VERSION
self.assert_dataproc_job(cluster_name, 'pyspark',
'{}/{}/{} --properties=spark.pyspark.python={},spark.pyspark.driver.python={}'
.format(self.INIT_ACTIONS_REPO,
self.COMPONENT,
self.TEST_SCRIPT_FILE_NAME,
python_version,
python_version))

@parameterized.parameters(
("SINGLE", ["m"]),
("STANDARD", ["m"]),
Expand All @@ -42,6 +61,22 @@ def test_livy(self, configuration, machine_suffixes):
self._verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))

@parameterized.parameters(
"SINGLE",
"STANDARD",
"HA",
)
def test_livy_job(self, configuration):
if self.getImageVersion() < pkg_resources.parse_version("2.0"):
self.skipTest("Not supported in 1.5 images")

if self.getImageVersion() >= pkg_resources.parse_version("2.0"):
metadata = 'livy-version={},scala-version={}'.format(self.LIVY_VERSION, self.SCALA_VERSION)
else:
metadata = 'livy-version={},scala-version={}'.format(self.DEFAULT_LIVY_VERSION, self.DEFAULT_SCALA_VERSION)
self.createCluster(configuration, self.INIT_ACTIONS, metadata=metadata)
self.__submit_pyspark_job(self.getClusterName())


if __name__ == '__main__':
absltest.main()

0 comments on commit 654e791

Please sign in to comment.