在上面的文章中,我成功运行了pipelines的简单实例。这个简单的例子没有文件的操作,但是这肯定不符合我们的要求,所以接下来介绍如何运行官网的ML 例子。

这次试用的例子是:KubeFlow pipeline using TFX OSS components

准备工作

由于这个例子使用的镜像,文件都是某歌的,所以我们要想办法把他弄到自己服务器能pull到的地方。

镜像:大家可以在dockerhub上搜索‘zoux’。所有相关的镜像,我都上传到dockerhub上去了。

文件:https://github.com/zoux86/kubeflow/tree/master/taxi-cab-classification

运行代码

代码片段1-定义文件路径和镜像名称

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
EXPERIMENT_NAME = 'TFX16'
OUTPUT_DIR = './output' 
PROJECT_NAME = 'TFX16'
TRAIN_DATA = '/home/zoux/data/taxi-cab-classification/train.csv'
EVAL_DATA = '/home/zoux/data/taxi-cab-classification/eval.csv'
HIDDEN_LAYER_SIZE = '1500'
STEPS = 3000
DATAFLOW_TFDV_IMAGE = '192.168.14.66:5000/ml-pipeline-dataflow-tfdv:v1'
DATAFLOW_TFT_IMAGE = '192.168.14.66:5000/ml-pipeline-dataflow-tft:v1'
DATAFLOW_TFMA_IMAGE = '192.168.14.66:5000/ml-pipeline-dataflow-tfma:v1'
DATAFLOW_TF_PREDICT_IMAGE = '192.168.14.66:5000/ml-pipeline-dataflow-tf-predict:v1'
KUBEFLOW_TF_TRAINER_IMAGE = '192.168.14.66:5000/ml-pipeline-kubeflow-tf-trainer:v1'
KUBEFLOW_DEPLOYER_IMAGE = '192.168.14.66:5000/ml-pipeline-kubeflow-deployer:v1'

注意: (1)这里我使用的都是集群本地仓库中的镜像,这些镜像我都是提前下载好,push到仓库中去的。 (2)我的镜像是自己定义的V1版本,这是我自己build的一个新镜像。和原来某歌的镜像不同在于,我在镜像中创建了一个文件夹’/home/zoux/data/taxi-cab-classification/‘,并将所有需要的文件都放在了这个文件夹中,这样每个启动的每个容器都可以在自己的镜像中使用所需要的文件。

代码片段2-导入python sdk

1
2
3
4
5
6
7
8
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.gcp as gcp
client = kfp.Client()
from kubernetes import client as k8s_client
exp = client.create_experiment(name=EXPERIMENT_NAME)

注意如果保存说找不到kfp模块,需要在jupyter中再下载一次python SDK。

1
!pip3 install https://storage.googleapis.com/ml-pipeline/release/0.1.3/kfp.tar.gz --upgrade

代码片段3-创建pipelines

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
mport kfp.dsl as dsl
# Below are a list of helper functions to wrap the components to provide a simpler interface for pipeline function.
def dataflow_tf_data_validation_op(inference_data: 'GcsUri', validation_data: 'GcsUri', column_names: 'GcsUri[text/json]', key_columns, project: 'GcpProject', mode, validation_output: 'GcsUri[Directory]', step_name='validation'):
    return dsl.ContainerOp(
        name = step_name,
        image = DATAFLOW_TFDV_IMAGE,
        arguments = [
            '--csv-data-for-inference', inference_data,
            '--csv-data-to-validate', validation_data,
            '--column-names', column_names,
            '--key-columns', key_columns,
            '--project', project,
            '--mode', mode,
            '--output', validation_output,
        ],
        file_outputs = {
            'schema': '/schema.txt',
        }
    )

def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'):
    return dsl.ContainerOp(
        name = step_name,
        image = DATAFLOW_TFT_IMAGE,
        arguments = [
            '--train', train_data,
            '--eval', evaluation_data,
            '--schema', schema,
            '--project', project,
            '--mode', preprocess_mode,
            '--preprocessing-module', preprocess_module,
            '--output', transform_output,
        ],
        file_outputs = {'transformed': '/output.txt'}
    )


def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training', use_gpu=False):
    tf_train_op = dsl.ContainerOp(
        name = step_name,
        image = KUBEFLOW_TF_TRAINER_IMAGE,
        arguments = [
            '--transformed-data-dir', transformed_data_dir,
            '--schema', schema,
            '--learning-rate', learning_rate,
            '--hidden-layer-size', hidden_layer_size,
            '--steps', steps,
            '--target', target,
            '--preprocessing-module', preprocess_module,
            '--job-dir', training_output,
        ],
        file_outputs = {'train': '/output.txt'}
    )
    return tf_train_op

def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', analyze_mode, analyze_slice_column, analysis_output: 'GcsUri', step_name='analysis'):
    return dsl.ContainerOp(
        name = step_name,
        image = DATAFLOW_TFMA_IMAGE,
        arguments = [
            '--model', model,
            '--eval', evaluation_data,
            '--schema', schema,
            '--project', project,
            '--mode', analyze_mode,
            '--slice-columns', analyze_slice_column,
            '--output', analysis_output,
        ],
        file_outputs = {'analysis': '/output.txt'}
    )


def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'):
    return dsl.ContainerOp(
        name = step_name,
        image = DATAFLOW_TF_PREDICT_IMAGE,
        arguments = [
            '--data', evaluation_data,
            '--schema', schema,
            '--target', target,
            '--model',  model,
            '--mode', predict_mode,
            '--project', project,
            '--output', prediction_output,
        ],
        file_outputs = {'prediction': '/output.txt'}
    )

def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='deploy'):
    return dsl.ContainerOp(
        name = step_name,
        image = KUBEFLOW_DEPLOYER_IMAGE,
        arguments = [
            '--model-path', model,
            '--server-name', tf_server_name
        ]
    )


# The pipeline definition
@dsl.pipeline(
  name='TFX Taxi Cab Classification Pipeline Example',
  description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'
)
def taxi_cab_classification(
    output,
    project,
    column_names=dsl.PipelineParam(name='column-names', value='/home/zoux/data/taxi-cab-classification/column-names.json'),
    key_columns=dsl.PipelineParam(name='key-columns', value='trip_start_timestamp'),
    train=dsl.PipelineParam(name='train', value=TRAIN_DATA),
    evaluation=dsl.PipelineParam(name='evaluation', value=EVAL_DATA),
    validation_mode=dsl.PipelineParam(name='validation-mode', value='local'),
    preprocess_mode=dsl.PipelineParam(name='preprocess-mode', value='local'),
    preprocess_module: dsl.PipelineParam=dsl.PipelineParam(name='preprocess-module', value='/home/zoux/data/taxi-cab-classification/preprocessing.py'),
    target=dsl.PipelineParam(name='target', value='tips'),
    learning_rate=dsl.PipelineParam(name='learning-rate', value=0.1),
    hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value=HIDDEN_LAYER_SIZE),
    steps=dsl.PipelineParam(name='steps', value=STEPS),
    predict_mode=dsl.PipelineParam(name='predict-mode', value='local'),
    analyze_mode=dsl.PipelineParam(name='analyze-mode', value='local'),
    analyze_slice_column=dsl.PipelineParam(name='analyze-slice-column', value='trip_start_hour')):

    # set the flag to use GPU trainer
    use_gpu = False
    
    validation_output = '/nfs-pv/tfx-pv'
    transform_output = '/nfs-pv/tfx-pv'
    training_output = '/nfs-pv/tfx-pv'
    analysis_output = '/nfs-pv/tfx-pv'
    prediction_output = '/nfs-pv/tfx-pv'
    tf_server_name = 'taxi-cab-classification-model-{{workflow.name}}'

    validation = dataflow_tf_data_validation_op(train, evaluation, column_names, key_columns, project, validation_mode, validation_output).add_volume(k8s_client.V1Volume(
        name='tfx-pv',
        nfs=k8s_client.V1NFSVolumeSource(path='/nfs-pv/tfx-pv',server='192.168.14.66'))).add_volume_mount(
        k8s_client.V1VolumeMount(mount_path='/nfs-pv/tfx-pv',name='tfx-pv'))
    
    preprocess = dataflow_tf_transform_op(train, evaluation, validation.outputs['schema'], project, preprocess_mode, preprocess_module, transform_output).add_volume(k8s_client.V1Volume(
        name='tfx-pv',
        nfs=k8s_client.V1NFSVolumeSource(path='/nfs-pv/tfx-pv',server='192.168.14.66'))).add_volume_mount(
        k8s_client.V1VolumeMount(mount_path='/nfs-pv/tfx-pv',name='tfx-pv'))
    
    
    training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate, hidden_layer_size, steps, target, preprocess_module, training_output, use_gpu=use_gpu).add_volume(k8s_client.V1Volume(
        name='tfx-pv',
        nfs=k8s_client.V1NFSVolumeSource(path='/nfs-pv/tfx-pv',server='192.168.14.66'))).add_volume_mount(
        k8s_client.V1VolumeMount(mount_path='/nfs-pv/tfx-pv',name='tfx-pv'))
    
    analysis = dataflow_tf_model_analyze_op(training.output, evaluation, validation.outputs['schema'], project, analyze_mode, analyze_slice_column, analysis_output).add_volume(k8s_client.V1Volume(
        name='tfx-pv',
        nfs=k8s_client.V1NFSVolumeSource(path='/nfs-pv/tfx-pv',server='192.168.14.66'))).add_volume_mount(
        k8s_client.V1VolumeMount(mount_path='/nfs-pv/tfx-pv',name='tfx-pv'))
    
    prediction = dataflow_tf_predict_op(evaluation, validation.outputs['schema'], target, training.output, predict_mode, project, prediction_output).add_volume(k8s_client.V1Volume(
        name='tfx-pv',
        nfs=k8s_client.V1NFSVolumeSource(path='/nfs-pv/tfx-pv',server='192.168.14.66'))).add_volume_mount(
        k8s_client.V1VolumeMount(mount_path='/nfs-pv/tfx-pv',name='tfx-pv'))
    
    deploy = kubeflow_deploy_op(training.output, tf_server_name).add_volume(k8s_client.V1Volume(
        name='tfx-pv',
        nfs=k8s_client.V1NFSVolumeSource(path='/nfs-pv/tfx-pv',server='192.168.14.66'))).add_volume_mount(
        k8s_client.V1VolumeMount(mount_path='/nfs-pv/tfx-pv',name='tfx-pv'))

这里需要注意,这个和官网给的例子是不同的,细心一点就可以发现这里没有 “apply(gcp.use_gcp_secret(‘user-gcp-sa’))”。

这是因为上面那句话的作用是使用gs的挂载卷。我们国内服务器是访问不到的。所以我们得使用自己的挂载卷。

说的明白一点就是,我们运作的是一个图,图中每一个节点是一个镜像。pipelines中每一个节点运行完,pod就

会消失,所以该节点的数据也会丢失。所以我们必须让所有节点都能访问一个永远存在的目录。这是我们就需要为

所有节点挂载持久存储卷。这里我使用的是NFS分布式存储系统来当存储卷。

关于如何使用NFS,前面的文章已经有过说明,这里不再累赘。这里直接说明如何让所有的容器都关联一个NFS PV存储卷。

创建pv,pvc

pv.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
apiVersion: v1
kind: PersistentVolume
metadata:
  name: tfx-pv
spec:
  capacity:
    storage: 5Gi
  accessModes:
    - ReadWriteOnce
  nfs:
    server: 192.168.14.66
    path: /nfs-pv/tfx-pv

pvc.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: tfxclaim
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 5Gi

创建好pv,pvc后。运行上面的代码就能跑TFX实例了。

结果如下: image22

这里显示最后几个还有问题是因为最后的服务和分析也是在某歌平台上的,我就没修改了。

到这里至少就知道pipelines能在自己的集群上跑起来。接下来就是自己写镜像,跑自己的代码了。