1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
(1)
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.gcp as gcp
client = kfp.Client()
from kubernetes import client as k8s_client
EXPERIMENT_NAME = 'usr1'
exp = client.create_experiment(name=EXPERIMENT_NAME)
(2)
class validateOp(dsl.ContainerOp):
"""对文件中的数据进行验证"""
def __init__(self, output_dir,inputFilename):
super(validateOp, self).__init__(
name='validate_number',
image='192.168.14.54:5000/user-validate:v8',
arguments = [
'--inputFilename', inputFilename,
'--output_dir', output_dir,
],
file_outputs={
'more': '/moreFilePath.txt',
'less': '/lessFilePath.txt',
})
class MoreThanZeroOp(dsl.ContainerOp):
"""handle the number more than zero"""
def __init__(self,output_dir,data):
super(MoreThanZeroOp, self).__init__(
name='MoreThanZero',
image='192.168.14.54:5000/user-morethanzero:v8',
arguments = [
'--output_dir',output_dir,
'--data', data,
])
class LessThanZeroOp(dsl.ContainerOp):
"""handle the number less than zero"""
def __init__(self,output_dir,data):
super(LessThanZeroOp, self).__init__(
name='LessThanZero',
image='192.168.14.54:5000/user-lessthanzero:v8',
arguments = [
'--output_dir',output_dir,
'--data', data,
])
@dsl.pipeline(
name='Testpipelines',
description='shows how to define dsl.Condition.'
)
def ValidateTest():
output_dir = '/nfs-pv/usr-pv'
inputFilename = 'a.txt'
validate = validateOp(output_dir,inputFilename).add_volume(k8s_client.V1Volume(name='usr-pv',nfs=k8s_client.V1NFSVolumeSource(path= '/nfs-pv/usr-pv',server='192.168.14.54'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/nfs-pv/usr-pv',name='usr-pv'))
moreThanZero = MoreThanZeroOp(output_dir,validate.outputs['more']).add_volume(k8s_client.V1Volume(name='usr-pv',nfs=k8s_client. V1NFSVolumeSource(path='/nfs-pv/usr-pv',server='192.168.14.54'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/nfs-pv/usr-pv',name='usr-pv'))
lessThanZero = LessThanZeroOp(output_dir,validate.outputs['less']).add_volume(k8s_client.V1Volume(name='usr-pv',nfs=k8s_client. V1NFSVolumeSource(path='/nfs-pv/usr-pv',server='192.168.14.54'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/nfs-pv/usr-pv',name='usr-pv'))
(3)# Submit a run.
compiler.Compiler().compile(ValidateTest, 'test.tar.gz')
run = client.run_pipeline(exp.id, 'usr', 'test.tar.gz')
|