Wenn Sie eine Ergebnismenge benötigen und ein Ref-Cursor nicht mit einem Datentyp namens sys.anydataset auskommt. Das heißt, Sie scheinen eine Pipeline-Funktion zu wollen, aber natürlich müssen Sie bei einer regulären Pipeline-Funktion die Ausgabestruktur definieren, die in Ihrem Fall nicht statisch ist.
Geben Sie einen beliebigen Datensatz ein. Dieser Typ ermöglicht es uns, dynamisch Typen im laufenden Betrieb zu generieren (nur zum Zeitpunkt der harten Analyse), damit wir Pipeline-Funktionen mit unterschiedlichen Ausgaben definieren können.
Die Codierung ist leider etwas komplex.
Zunächst definieren wir einen Typ, der die Verarbeitung der übergebenen SQL-Anweisung übernimmt.
SQL> create type dyn_pipeline as object
2 (
3 atype anytype,
4
5 static function ODCITableDescribe(rtype out anytype,
6 stmt in varchar2)
7 return number,
8
9 static function ODCITablePrepare(sctx out dyn_pipeline,
10 tf_info in sys.ODCITabfuncinfo,
11 stmt in varchar2)
12 return number,
13
14 static function ODCITableStart(sctx in out dyn_pipeline,
15 stmt in varchar2)
16 return number,
17
18 member function ODCITablefetch(self in out dyn_pipeline,
19 nrows in number,
20 rws out anydataset)
21 return number,
22
23 member function ODCITableClose(self in dyn_pipeline)
24 return number
25 );
26 /
Als nächstes erstellen wir eine Paketspezifikation, die im Grunde Ihre querydb
sein wird Funktionsaufruf:
SQL> create package pkg_pipeline
2 as
3
4 /*
5 * Global Types
6 */
7 -- Describe array.
8 type dynamic_sql_rec is record(cursor integer,
9 column_cnt pls_integer,
10 description dbms_sql.desc_tab2,
11 execute integer);
12 -- Meta data for the ANYTYPE.
13 type anytype_metadata_rec is record(precision pls_integer,
14 scale pls_integer,
15 length pls_integer,
16 csid pls_integer,
17 csfrm pls_integer,
18 schema varchar2(30),
19 type anytype,
20 name varchar2(30),
21 version varchar2(30),
22 attr_cnt pls_integer,
23 attr_type anytype,
24 attr_name varchar2(128),
25 typecode pls_integer);
26
27
28 /*
29 * Global Variables
30 */
31 -- SQL descriptor.
32 r_sql dynamic_sql_rec;
33
34 /*
35 * function will run the given SQL
36 */
37 function querydb(p_stmt in varchar2)
38 return anydataset pipelined using dyn_pipeline;
39
40 end pkg_pipeline;
41 /
Package created.
Die Typen dort enthalten nur einige Informationen über die SQL-Struktur selbst (wir werden DBMS_SQL
verwenden um die Eingabe-SQL zu beschreiben, da sie über Funktionen verfügt, um die Anzahl der Spalten, Datentypen usw. aus einer beliebigen SQL-Anweisung abzurufen.
Der Haupttypkörper ist der Ort, an dem die Verarbeitung stattfindet:
SQL> create type body dyn_pipeline
2 as
3
4 /*
5 * DESC step. this will be called at hard parse and will create
6 * a physical type in the DB Schema based on the select columns.
7 */
8 static function ODCITableDescribe(rtype out anytype,
9 stmt in varchar2)
10 return number
11 is
12
13 /* Variables */
14 -- Type to hold the dbms_sql info (description)
15 r_sql pkg_pipeline.dynamic_sql_rec;
16 -- Type to create (has all the columns) of the sql query.
17 t_anyt anytype;
18 -- SQL query that will be made up from the 2 passed in queries.
19 v_sql varchar2(32767);
20
21 begin
22
23 /*
24 * Parse the SQL and describe its format and structure.
25 */
26 v_sql := replace(stmt, ';', null);
27
28 -- open, parse and discover all info about this SQL.
29 r_sql.cursor := dbms_sql.open_cursor;
30 dbms_sql.parse( r_sql.cursor, v_sql, dbms_sql.native );
31 dbms_sql.describe_columns2( r_sql.cursor, r_sql.column_cnt, r_sql.description );
32 dbms_sql.close_cursor( r_sql.cursor );
33
34 -- Start to create the physical type.
35 anytype.BeginCreate( DBMS_TYPES.TYPECODE_OBJECT, t_anyt );
36
37 -- Loop through each attribute and add to the type.
38 for i in 1 .. r_sql.column_cnt
39 loop
40
41 t_anyt.AddAttr(r_sql.description(i).col_name,
42 case
43 when r_sql.description(i).col_type in (1,96,11,208)
44 then dbms_types.typecode_varchar2
45 when r_sql.description(i).col_type = 2
46 then dbms_types.typecode_number
47 when r_sql.description(i).col_type in (8,112)
48 then dbms_types.typecode_clob
49 when r_sql.description(i).col_type = 12
50 then dbms_types.typecode_date
51 when r_sql.description(i).col_type = 23
52 then dbms_types.typecode_raw
53 when r_sql.description(i).col_type = 180
54 then dbms_types.typecode_timestamp
55 when r_sql.description(i).col_type = 181
56 then dbms_types.typecode_timestamp_tz
57 when r_sql.description(i).col_type = 182
58 then dbms_types.typecode_interval_ym
59 when r_sql.description(i).col_type = 183
60 then dbms_types.typecode_interval_ds
61 when r_sql.description(i).col_type = 231
62 then dbms_types.typecode_timestamp_ltz
63 end,
64 r_sql.description(i).col_precision,
65 r_sql.description(i).col_scale,
66 r_sql.description(i).col_max_len,
67 r_sql.description(i).col_charsetid,
68 r_sql.description(i).col_charsetform );
69 end loop;
70
71 t_anyt.EndCreate;
72
73 -- set the output type to our built type.
74 ANYTYPE.BeginCreate(dbms_types.TYPECODE_TABLE, rtype);
75 rtype.SetInfo(null, null, null, null, null, t_anyt,
76 dbms_types.TYPECODE_OBJECT, 0);
77 rtype.EndCreate();
78
79 return ODCIConst.Success;
80
81 end ODCITableDescribe;
82
83
84 /*
85 * PREPARE step. Initialise our type.
86 */
87 static function ODCITableprepare(sctx out dyn_pipeline,
88 tf_info in sys.ODCITabfuncinfo,
89 stmt in varchar2)
90 return number
91 is
92
93 /* Variables */
94 -- Meta data.
95 r_meta pkg_pipeline.anytype_metadata_rec;
96
97 begin
98
99 r_meta.typecode := tf_info.rettype.getattreleminfo(
100 1, r_meta.precision, r_meta.scale, r_meta.length,
101 r_meta.csid, r_meta.csfrm, r_meta.type, r_meta.name
102 );
103
104 sctx := dyn_pipeline(r_meta.type);
105 return odciconst.success;
106
107 end;
108
109
110 /*
111 * START step. this is where we execute the cursor prior to fetching from it.
112 */
113 static function ODCITablestart(sctx in out dyn_pipeline,
114 stmt in varchar2)
115 return number
116 is
117
118 /* Variables */
119 r_meta pkg_pipeline.anytype_metadata_rec;
120 v_sql varchar2(32767);
121 begin
122
123 v_sql := replace(stmt, ';', null);
124 pkg_pipeline.r_sql.cursor := dbms_sql.open_cursor;
125 dbms_sql.parse(pkg_pipeline.r_sql.cursor, v_sql, dbms_sql.native);
126 dbms_sql.describe_columns2(pkg_pipeline.r_sql.cursor,
127 pkg_pipeline.r_sql.column_cnt,
128 pkg_pipeline.r_sql.description);
129
130 -- define all the columns found to let Oracle know the datatypes.
131 for i in 1..pkg_pipeline.r_sql.column_cnt
132 loop
133
134 r_meta.typecode := sctx.atype.GetAttrElemInfo(
135 i, r_meta.precision, r_meta.scale, r_meta.length,
136 r_meta.csid, r_meta.csfrm, r_meta.type, r_meta.name
137 );
138
139 case r_meta.typecode
140 when dbms_types.typecode_varchar2
141 then
142 dbms_sql.define_column(pkg_pipeline.r_sql.cursor, i, '', 32767);
143 when dbms_types.typecode_number
144 then
145 dbms_sql.define_column(pkg_pipeline.r_sql.cursor, i, cast(null as number));
146 when dbms_types.typecode_date
147 then
148 dbms_sql.define_column(pkg_pipeline.r_sql.cursor, i, cast(null as date));
149 when dbms_types.typecode_raw
150 then
151 dbms_sql.define_column_raw(pkg_pipeline.r_sql.cursor, i, cast(null as raw), r_meta.length);
152 when dbms_types.typecode_timestamp
153 then
154 dbms_sql.define_column(pkg_pipeline.r_sql.cursor, i, cast(null as timestamp));
155 when dbms_types.typecode_timestamp_tz
156 then
157 dbms_sql.define_column(pkg_pipeline.r_sql.cursor, i, cast(null as timestamp with time zone));
158 when dbms_types.typecode_timestamp_ltz
159 then
160 dbms_sql.define_column(pkg_pipeline.r_sql.cursor, i, cast(null as timestamp with local time zone));
161 when dbms_types.typecode_interval_ym
162 then
163 dbms_sql.define_column(pkg_pipeline.r_sql.cursor, i, cast(null as interval year to month));
164 when dbms_types.typecode_interval_ds
165 then
166 dbms_sql.define_column(pkg_pipeline.r_sql.cursor, i, cast(null as interval day to second));
167 when dbms_types.typecode_clob
168 then
169 case pkg_pipeline.r_sql.description(i).col_type
170 when 8
171 then
172 dbms_sql.define_column_long(pkg_pipeline.r_sql.cursor, i);
173 else
174 dbms_sql.define_column(pkg_pipeline.r_sql.cursor, i, cast(null as clob));
175 end case;
176 end case;
177 end loop;
178
179 -- execute the SQL.
180 pkg_pipeline.r_sql.execute := dbms_sql.execute(pkg_pipeline.r_sql.cursor);
181
182 return odciconst.success;
183
184 end ODCITablestart;
185
186
187 /*
188 * FETCH step.
189 */
190 member function ODCITablefetch(self in out dyn_pipeline,
191 nrows in number,
192 rws out anydataset)
193 return number
194 is
195
196 /* Variables */
197 -- Buffers to hold values.
198 v_vc_col varchar2(32767);
199 v_num_col number;
200 v_date_col date;
201 v_raw_col raw(32767);
202 v_raw_error number;
203 v_raw_len integer;
204 v_int_ds_col interval day to second;
205 v_int_ym_col interval year to month;
206 v_ts_col timestamp;
207 v_tstz_col timestamp with time zone;
208 v_tsltz_col timestamp with local time zone;
209 v_clob_col clob;
210 v_clob_offset integer := 0;
211 v_clob_len integer;
212 -- Metadata
213 r_meta pkg_pipeline.anytype_metadata_rec;
214
215 begin
216
217 if dbms_sql.fetch_rows( pkg_pipeline.r_sql.cursor ) > 0
218 then
219
220 -- Describe to get number and types of columns.
221 r_meta.typecode := self.atype.getinfo(
222 r_meta.precision, r_meta.scale, r_meta.length,
223 r_meta.csid, r_meta.csfrm, r_meta.schema,
224 r_meta.name, r_meta.version, r_meta.attr_cnt
225 );
226
227 anydataset.begincreate(dbms_types.typecode_object, self.atype, rws);
228 rws.addinstance();
229 rws.piecewise();
230
231 -- loop through each column extracting value.
232 for i in 1..pkg_pipeline.r_sql.column_cnt
233 loop
234
235 r_meta.typecode := self.atype.getattreleminfo(
236 i, r_meta.precision, r_meta.scale, r_meta.length,
237 r_meta.csid, r_meta.csfrm, r_meta.attr_type,
238 r_meta.attr_name
239 );
240
241 case r_meta.typecode
242 when dbms_types.typecode_varchar2
243 then
244 dbms_sql.column_value(pkg_pipeline.r_sql.cursor, i, v_vc_col);
245 rws.setvarchar2(v_vc_col);
246 when dbms_types.typecode_number
247 then
248 dbms_sql.column_value(pkg_pipeline.r_sql.cursor, i, v_num_col);
249 rws.setnumber(v_num_col);
250 when dbms_types.typecode_date
251 then
252 dbms_sql.column_value(pkg_pipeline.r_sql.cursor, i, v_date_col);
253 rws.setdate(v_date_col);
254 when dbms_types.typecode_raw
255 then
256 dbms_sql.column_value_raw(pkg_pipeline.r_sql.cursor, i, v_raw_col,
257 v_raw_error, v_raw_len);
258 rws.setraw(v_raw_col);
259 when dbms_types.typecode_interval_ds
260 then
261 dbms_sql.column_value(pkg_pipeline.r_sql.cursor, i, v_int_ds_col);
262 rws.setintervalds(v_int_ds_col);
263 when dbms_types.typecode_interval_ym
264 then
265 dbms_sql.column_value(pkg_pipeline.r_sql.cursor, i, v_int_ym_col);
266 rws.setintervalym(v_int_ym_col);
267 when dbms_types.typecode_timestamp
268 then
269 dbms_sql.column_value(pkg_pipeline.r_sql.cursor, i, v_ts_col);
270 rws.settimestamp(v_ts_col);
271 when dbms_types.typecode_timestamp_tz
272 then
273 dbms_sql.column_value(pkg_pipeline.r_sql.cursor, i, v_tstz_col);
274 rws.settimestamptz(v_tstz_col);
275 when dbms_types.typecode_timestamp_ltz
276 then
277 dbms_sql.column_value(pkg_pipeline.r_sql.cursor, i, v_tsltz_col);
278 rws.settimestampltz(v_tsltz_col);
279 when dbms_types.typecode_clob
280 then
281 case pkg_pipeline.r_sql.description(i).col_type
282 when 8
283 then
284 loop
285 dbms_sql.column_value_long(pkg_pipeline.r_sql.cursor, i, 32767, v_clob_offset,
286 v_vc_col, v_clob_len);
287 v_clob_col := v_clob_col || v_vc_col;
288 v_clob_offset := v_clob_offset + 32767;
289 exit when v_clob_len < 32767;
290 end loop;
291 else
292 dbms_sql.column_value(pkg_pipeline.r_sql.cursor, i, v_clob_col);
293 end case;
294 rws.setclob(v_clob_col);
295 end case;
296 end loop;
297
298 rws.endcreate();
299
300 end if;
301
302 return ODCIConst.Success;
303
304 end;
305
306 /*
307 * CLOSE step. close the cursor.
308 */
309 member function ODCITableClose(self in dyn_pipeline)
310 return number
311 is
312
313
314 begin
315 dbms_sql.close_cursor( pkg_pipeline.r_sql.cursor );
316 pkg_pipeline.r_sql := null;
317 return odciconst.success;
318 end ODCITableClose;
319
320 end;
321 /
Type body created.
Sobald dies erledigt ist, können Sie Folgendes abfragen:
SQL> select * from table(pkg_pipeline.querydb('select * from dual'));
D
-
X
SQL> select * from table(pkg_pipeline.querydb('select * from v$mystat where rownum <= 2'));
SID STATISTIC# VALUE
---------- ---------- ----------
230 0 1
230 1 1